communications-mining
latest
false
Importante :
Este conteúdo foi traduzido com auxílio de tradução automática.
Guia do desenvolvedor do Communications Mining
Last updated 3 de out de 2024

Como buscar dados para o Tableau com o Python

Este tutorial descreve como obter dados da plataforma Communications Mining em um formato adequado para importação para o Tableau ou aplicativo de análise semelhante, usando o Python 3.

Importante:

Permissões necessárias para este TUTORIAL

  • Ver Origens
  • Exibir rótulos
  • Administrador de transmissões
  • Consumir transmissões

Dados neste tutorial

Dados neste tutorial

Este tutorial mostrará como buscar os seguintes dados:

Campos do Communications Mining:

  • previsão de rótulo para cada rótulo na taxonomia ( 0 , se o rótulo não for previsto ou um valor entre 0.0 e 1.0)
  • campos gerais correspondentes
  • Pontuação de Qualidade do Serviço (se a Qualidade do Serviço estiver habilitado para o conjunto de dados)
  • Pontuação de tom (se Tom estiver habilitado para o conjunto de dados)

Dados do comentário:

  • IDs
    • ID do comentário
    • ID de origem
    • ID da Mensagem de email (ID exclusivo fornecido pelo Exchange)
    • ID da thread
  • Assunto do e-mail
  • Corpo do e-mail
  • email FROM remetente
  • lista de destinatários de emails
  • lista de destinatários de e-mail em CC
  • lista de destinatários de e-mail em BCC
  • pasta da caixa de correio (onde o e-mail estava no momento em que foi sincronizado)
  • Número de Anexos
  • lista de nomes de anexos
  • email SENT AT carimbo de data/hora

Exemplo em Python

Exemplo em Python

Esta seção demonstra como buscar comentários da plataforma Communications Mining e convertê-los em um formato adequado para o Tableau ou aplicativos de análise semelhantes. Adapte este exemplo para os seus requisitos específicos.

Verifique se você está usando o Python 3 e tem as seguintes bibliotecas instaladas:

  • urllib3 e requests (para fazer solicitações à API do Communications Mining)
  • pandas (para converter os dados para dataframe na etapa final do tutorial)

Configuração

Identifique os recursos necessários para as etapas a seguir.

  • Ponto de extremidade da sua API
    • Para tenants integrados via UiPath: https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • Para tenants autônomos: https://<my_domain>.reinfer.io/api/v1
  • Token da sua API
  • Nome do conjunto de dados do qual buscar comentários
Nome do fluxo a ser usado para buscar comentários. Você deve criar um novo fluxo ou usar um fluxo existente.
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
Crie uma requests sessão que será usada para todas as solicitações da API. É recomendável que você a configure para repetir as solicitações com falha (consulte o exemplo).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
As etapas a seguir geram um DownloadError em caso de erros.
class DownloadError(Exception):
    passclass DownloadError(Exception):
    pass

Taxonomia do rótulo e campos gerais

Obter a taxonomia do rótulo (ou seja, nomes de todos os rótulos) do conjunto de dados para a versão do modelo usado pelo fluxo. Isso é necessário para a próxima etapa.

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

Conversão de comentário

O Communications Mining retorna comentários como objetos JSON aninhados. Para uso no Tableau ou aplicativos semelhantes, os objetos JSON aninhados precisam ser convertidos em um formato tabular mais adequado.

Defina uma função de utilitário para converter um objeto comment do Communications Mining para um formato tabular.

Observe que, como um comentário pode ter vários do mesmo campo geral, neste exemplo, todas as entidades correspondentes do mesmo tipo são concatenadas e colocadas na mesma coluna.

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dictdef comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

Definir ponto de partida do stream

Por padrão, um fluxo retornará comentários mais recentes do que seu horário de criação. Durante o desenvolvimento, frequentemente é necessário redefinir o fluxo para que ele seja iniciado a partir de um ponto específico no tempo.

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

Loop de busca avançada

Um fluxo fornece comentários em lotes e mantém o controle do último comentário buscado. Os comentários são buscados usando a rota de busca de fluxo e um lote é confirmado usando a rota de avançar de fluxo. Se um lote não for confirmado, o fluxo não passará para fornecer o próximo lote. Portanto, o processo de buscar comentários do Communications Mining é chamado de loop fetch-advance.

Defina uma função utilitária que busque comentários repetindo o loop fetch-advance até todos os comentários serem buscados. Para fins de demonstração, esta função armazena todos os comentários obtidos na memória. Em um cenário de produção ou qualquer cenário com uma grande quantidade de dados, cada lote de comentários deve ser enviado para um armazenamento de dados ou anexado a um arquivo.

Como o fluxo rastreia o último comentário obtido, é seguro interromper e retomar esse processo.

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

Neste ponto, você pode continuar com o processamento ou armazenamento dos dados, de acordo com suas necessidades.

Se você precisar buscar os mesmos dados novamente (para fins de teste), o fluxo precisará ser redefinido.

Esta página foi útil?

Obtenha a ajuda que você precisa
Aprendendo RPA - Cursos de automação
Fórum da comunidade da Uipath
Uipath Logo White
Confiança e segurança
© 2005-2024 UiPath. Todos os direitos reservados.