Communications Mining
latest
false
Importante :
Este contenido se ha traducido mediante traducción automática.
Guía para desarrolladores de Communications Mining
Last updated 19 de jul. de 2024

Obtener datos para Tableau con Python

Este tutorial describe cómo obtener datos de la plataforma Communications Mining en un formato adecuado para importarlos a Tableau o una aplicación de análisis similar, utilizando Python 3.

Importante:

PERMISOS REQUERIDOS PARA ESTE TUTORIAL

  • Ver orígenes
  • Ver etiquetas
  • Administrador de transmisiones
  • Consumir transmisiones

Datos de este tutorial

Datos de este tutorial

Este tutorial mostrará cómo obtener los siguientes datos:

Campos de Communications Mining:

  • predicción de etiqueta para cada etiqueta en la taxonomía (ya sea 0 si la etiqueta no se predice, o un valor entre 0.0 y 1.0)
  • campos generales coincidentes
  • Puntuación de la calidad de servicio (si la calidad de servicio está habilitada para el conjunto de datos)
  • Puntuación de tono (si Tono está habilitado para el conjunto de datos)

Datos del comentario:

  • ID
    • ID de comentario
    • ID de origen
    • ID de mensaje de correo electrónico (ID único proporcionado por Exchange)
    • ID de hilo
  • Asunto del mensaje de correo electrónico
  • Cuerpo del mensaje de correo electrónico
  • correo electrónico DEL remitente
  • lista de destinatarios de correo electrónico A
  • lista de destinatarios de correo electrónico de CC
  • lista de destinatarios de correo electrónico CCO
  • carpeta del buzón (donde estaba el correo electrónico en el momento de la sincronización)
  • Número de archivos adjuntos
  • lista de nombres de archivos adjuntos
  • correo electrónico ENVIADO A LAS

Ejemplo de Python

Ejemplo de Python

Esta sección muestra cómo obtener comentarios de la plataforma Communications Mining y convertirlos a un formato adecuado para Tableau o aplicaciones de análisis similares. Adapte este ejemplo a sus requisitos específicos.

Asegúrate de utilizar Python 3 y de tener instaladas las siguientes bibliotecas:

  • urllib3 y requests (para realizar solicitudes a la API de Communications Mining)
  • pandas (para convertir los datos en marco de datos en el paso final del tutorial)

Configuración

Identifique los recursos necesarios para los siguientes pasos.

  • Su punto final de API
    • Para tenants incorporados a través de UiPath: https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • Para tenants independientes: https://<my_domain>.reinfer.io/api/v1
  • Su token de API
  • Nombre del conjunto de datos del que obtener comentarios
Nombre de la transmisión que se utilizará para obtener comentarios. Debes crear una nueva transmisión o utilizar una transmisión existente.
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
Crea una sesión requests que se utilizará para todas las solicitudes de API. Se recomienda configurarlo para reintentar las solicitudes fallidas (ver ejemplo).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
Los siguientes pasos generan un DownloadError en caso de errores.
class DownloadError(Exception):
passclass DownloadError(Exception):
pass

Taxonomía de etiquetas y campos Generales

Obtener la taxonomía de la etiqueta (es decir, nombres de todas las etiquetas) del conjunto de datos para la versión del modelo utilizada por la transmisión. Esto es necesario para el siguiente paso.

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

Conversión de comentarios

Communications Mining devuelve los comentarios como objetos JSON anidados. Para su uso en Tableau o aplicaciones similares, los objetos JSON anidados deben convertirse a un formato tabular más adecuado.

Define una función de utilidad para convertir un objeto comment de Communications Mining a un formato tabular.

Ten en cuenta que, dado que un comentario puede tener varios del mismo campo general, en este ejemplo todas las entidades coincidentes del mismo tipo se concatenan y se colocan en la misma columna.

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dictdef comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

Establecer punto de inicio de transmisión

De forma predeterminada, una transmisión devolverá comentarios más recientes que su hora de creación. Durante el desarrollo, a menudo es necesario restablecer la transmisión para que comience desde un punto específico en el tiempo.

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

Bucle Obtener-Avanzar

Una transmisión proporciona comentarios en lotes y realiza un seguimiento del último comentario obtenido. Los comentarios se obtienen mediante la ruta de obtención de flujo, y un lote se reconoce mediante la ruta de avance de flujo. Si no se reconoce un lote, la transmisión no pasará a proporcionar el siguiente lote. Por lo tanto, el proceso de obtención de comentarios de Communications Mining se denomina bucle fetch-advance.

Define una función de utilidad que obtenga comentarios repitiendo el bucle fetch-advance hasta que se obtengan todos los comentarios. Para fines de demostración, esta función almacena todos los comentarios obtenidos en la memoria. En un escenario de producción, o en cualquier escenario con una gran cantidad de datos, cada lote de comentarios debe enviarse a un almacén de datos o anexarse a un archivo.

Dado que la transmisión realiza un seguimiento del último comentario obtenido, es seguro detener y reanudar este proceso.

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

En este punto, puedes continuar procesando o almacenando los datos según tus necesidades.

Si necesitas recuperar los mismos datos de nuevo (con fines de prueba), la transmisión debe restablecerse.

¿Te ha resultado útil esta página?

Obtén la ayuda que necesitas
RPA para el aprendizaje - Cursos de automatización
Foro de la comunidad UiPath
Uipath Logo White
Confianza y seguridad
© 2005-2024 UiPath. Todos los derechos reservados.