- Documentos de la API
- Introducción
- Uso de la API
- Tutorial de la API
- Resumen
- Fuentes
- Conjuntos de datos
- Comentarios
- Archivos adjuntos
- Predictions
- Crear una transmisión
- Actualizar una transmisión
- Obtener una transmisión por nombre
- Obtener todas las transmisiones
- Eliminar una transmisión
- Obtener resultados de la transmisión
- Obtener comentarios de una transmisión (heredado)
- Avanzar una transmisión
- Restablecer una transmisión
- Etiquetar una excepción
- Desetiquetar una excepción
- Eventos de auditoría
- Obtener todos los usuarios
- CLI
- Guías de integración
- Blog
- Cómo aprenden las máquinas a entender palabras: una guía para las incrustaciones en PNL
- Aprendizaje basado en solicitudes con Transformers
- Efficient Transformers II: destilación de conocimientos y ajuste
- Transformadores eficientes I: mecanismos de atención
- Modelado de intenciones jerárquico profundo no supervisado: obtener valor sin datos de entrenamiento
- Corrección del sesgo de anotación con Communications Mining
- Aprendizaje activo: mejores modelos ML en menos tiempo
- Todo está en los números: evaluar el rendimiento del modelo con métricas
- Por qué es importante la validación del modelo
- Comparación de Communications Mining y Google AutoML para la inteligencia de datos conversacional
Obtener datos para Tableau con Python
Este tutorial describe cómo obtener datos de la plataforma Communications Mining en un formato adecuado para importarlos a Tableau o una aplicación de análisis similar, utilizando Python 3.
PERMISOS REQUERIDOS PARA ESTE TUTORIAL
- Ver orígenes
- Ver etiquetas
- Administrador de transmisiones
- Consumir transmisiones
Este tutorial mostrará cómo obtener los siguientes datos:
Campos de Communications Mining:
- predicción de etiqueta para cada etiqueta en la taxonomía (ya sea
0
si la etiqueta no se predice, o un valor entre0.0
y1.0
) - campos generales coincidentes
- Puntuación de la calidad de servicio (si la calidad de servicio está habilitada para el conjunto de datos)
- Puntuación de tono (si Tono está habilitado para el conjunto de datos)
Datos del comentario:
- ID
- ID de comentario
- ID de origen
- ID de mensaje de correo electrónico (ID único proporcionado por Exchange)
- ID de hilo
- Asunto del mensaje de correo electrónico
- Cuerpo del mensaje de correo electrónico
- correo electrónico DEL remitente
- lista de destinatarios de correo electrónico A
- lista de destinatarios de correo electrónico de CC
- lista de destinatarios de correo electrónico CCO
- carpeta del buzón (donde estaba el correo electrónico en el momento de la sincronización)
- Número de archivos adjuntos
- lista de nombres de archivos adjuntos
- correo electrónico ENVIADO A LAS
Esta sección muestra cómo obtener comentarios de la plataforma Communications Mining y convertirlos a un formato adecuado para Tableau o aplicaciones de análisis similares. Adapte este ejemplo a sus requisitos específicos.
Asegúrate de utilizar Python 3 y de tener instaladas las siguientes bibliotecas:
urllib3
yrequests
(para realizar solicitudes a la API de Communications Mining)pandas
(para convertir los datos en marco de datos en el paso final del tutorial)
Identifique los recursos necesarios para los siguientes pasos.
- Su punto final de API
- Para tenants incorporados a través de UiPath:
https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
- Para tenants independientes:
https://<my_domain>.reinfer.io/api/v1
- Para tenants incorporados a través de UiPath:
- Su token de API
- Nombre del conjunto de datos del que obtener comentarios
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
requests
que se utilizará para todas las solicitudes de API. Se recomienda configurarlo para reintentar las solicitudes fallidas (ver ejemplo).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
DownloadError
en caso de errores.
class DownloadError(Exception):
pass
class DownloadError(Exception):
pass
Obtener la taxonomía de la etiqueta (es decir, nombres de todas las etiquetas) del conjunto de datos para la versión del modelo utilizada por la transmisión. Esto es necesario para el siguiente paso.
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
Communications Mining devuelve los comentarios como objetos JSON anidados. Para su uso en Tableau o aplicaciones similares, los objetos JSON anidados deben convertirse a un formato tabular más adecuado.
comment
de Communications Mining a un formato tabular.
Ten en cuenta que, dado que un comentario puede tener varios del mismo campo general, en este ejemplo todas las entidades coincidentes del mismo tipo se concatenan y se colocan en la misma columna.
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
De forma predeterminada, una transmisión devolverá comentarios más recientes que su hora de creación. Durante el desarrollo, a menudo es necesario restablecer la transmisión para que comience desde un punto específico en el tiempo.
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
Una transmisión proporciona comentarios en lotes y realiza un seguimiento del último comentario obtenido. Los comentarios se obtienen mediante la ruta de obtención de flujo, y un lote se reconoce mediante la ruta de avance de flujo. Si no se reconoce un lote, la transmisión no pasará a proporcionar el siguiente lote. Por lo tanto, el proceso de obtención de comentarios de Communications Mining se denomina bucle fetch-advance.
Define una función de utilidad que obtenga comentarios repitiendo el bucle fetch-advance hasta que se obtengan todos los comentarios. Para fines de demostración, esta función almacena todos los comentarios obtenidos en la memoria. En un escenario de producción, o en cualquier escenario con una gran cantidad de datos, cada lote de comentarios debe enviarse a un almacén de datos o anexarse a un archivo.
Dado que la transmisión realiza un seguimiento del último comentario obtenido, es seguro detener y reanudar este proceso.
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
En este punto, puedes continuar procesando o almacenando los datos según tus necesidades.
Si necesitas recuperar los mismos datos de nuevo (con fines de prueba), la transmisión debe restablecerse.