- Documentos da API
- CLI
- Guias de integração
- Blog
- Como as máquinas aprendem a entender as palavras: um guia para incorporações ao NLP
- Aprendizado baseado em solicitação com Transformers
- Efficient Transformers II: extração de conhecimento e ajustes finos
- Transformers eficientes I: mecanismos de atenção
- Modelagem de intenção hierárquica profunda não supervisionada: obtenção de valor sem dados de treinamento
- Corrigindo viés de anotação com Communications Mining
- Aprendizado ativo: melhores modelos de ML em menos tempo
- Está tudo nos números - avaliando o desempenho do modelo com métricas
- Por que a validação de modelos é importante
- Comparação do Communications Mining e do Google AutoML para inteligência de dados conversacional
Como buscar dados para o Tableau com o Python
Este tutorial descreve como obter dados da plataforma Communications Mining em um formato adequado para importação para o Tableau ou aplicativo de análise semelhante, usando o Python 3.
Permissões necessárias para este TUTORIAL
- Ver Origens
- Exibir rótulos
- Administrador de transmissões
- Consumir transmissões
Este tutorial mostrará como buscar os seguintes dados:
Campos do Communications Mining:
- previsão de rótulo para cada rótulo na taxonomia (
0
, se o rótulo não for previsto ou um valor entre0.0
e1.0
) - campos gerais correspondentes
- Pontuação de Qualidade do Serviço (se a Qualidade do Serviço estiver habilitado para o conjunto de dados)
- Pontuação de tom (se Tom estiver habilitado para o conjunto de dados)
Dados do comentário:
- IDs
- ID do comentário
- ID de origem
- ID da Mensagem de email (ID exclusivo fornecido pelo Exchange)
- ID da thread
- Assunto do e-mail
- Corpo do e-mail
- email FROM remetente
- lista de destinatários de emails
- lista de destinatários de e-mail em CC
- lista de destinatários de e-mail em BCC
- pasta da caixa de correio (onde o e-mail estava no momento em que foi sincronizado)
- Número de Anexos
- lista de nomes de anexos
- email SENT AT carimbo de data/hora
Esta seção demonstra como buscar comentários da plataforma Communications Mining e convertê-los em um formato adequado para o Tableau ou aplicativos de análise semelhantes. Adapte este exemplo para os seus requisitos específicos.
Verifique se você está usando o Python 3 e tem as seguintes bibliotecas instaladas:
urllib3
erequests
(para fazer solicitações à API do Communications Mining)pandas
(para converter os dados para dataframe na etapa final do tutorial)
Identifique os recursos necessários para as etapas a seguir.
- Ponto de extremidade da sua API
- Para tenants integrados via UiPath:
https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
- Para tenants autônomos:
https://<my_domain>.reinfer.io/api/v1
- Para tenants integrados via UiPath:
- Token da sua API
- Nome do conjunto de dados do qual buscar comentários
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
requests
sessão que será usada para todas as solicitações da API. É recomendável que você a configure para repetir as solicitações com falha (consulte o exemplo).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
DownloadError
em caso de erros.
class DownloadError(Exception):
pass
class DownloadError(Exception):
pass
Obter a taxonomia do rótulo (ou seja, nomes de todos os rótulos) do conjunto de dados para a versão do modelo usado pelo fluxo. Isso é necessário para a próxima etapa.
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
O Communications Mining retorna comentários como objetos JSON aninhados. Para uso no Tableau ou aplicativos semelhantes, os objetos JSON aninhados precisam ser convertidos em um formato tabular mais adequado.
comment
do Communications Mining para um formato tabular.
Observe que, como um comentário pode ter vários do mesmo campo geral, neste exemplo, todas as entidades correspondentes do mesmo tipo são concatenadas e colocadas na mesma coluna.
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
Por padrão, um fluxo retornará comentários mais recentes do que seu horário de criação. Durante o desenvolvimento, frequentemente é necessário redefinir o fluxo para que ele seja iniciado a partir de um ponto específico no tempo.
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
Um fluxo fornece comentários em lotes e mantém o controle do último comentário buscado. Os comentários são buscados usando a rota de busca de fluxo e um lote é confirmado usando a rota de avançar de fluxo. Se um lote não for confirmado, o fluxo não passará para fornecer o próximo lote. Portanto, o processo de buscar comentários do Communications Mining é chamado de loop fetch-advance.
Defina uma função utilitária que busque comentários repetindo o loop fetch-advance até todos os comentários serem buscados. Para fins de demonstração, esta função armazena todos os comentários obtidos na memória. Em um cenário de produção ou qualquer cenário com uma grande quantidade de dados, cada lote de comentários deve ser enviado para um armazenamento de dados ou anexado a um arquivo.
Como o fluxo rastreia o último comentário obtido, é seguro interromper e retomar esse processo.
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
Neste ponto, você pode continuar com o processamento ou armazenamento dos dados, de acordo com suas necessidades.
Se você precisar buscar os mesmos dados novamente (para fins de teste), o fluxo precisará ser redefinido.