- Documents d’API
- CLI
- Guides d'intégration
- Intégration avec l'utilisateur du service Azure
- Intégration avec l'authentification d'application Azure
- Automatisation en temps réel
- Récupérer des données pour Tableau avec Python
- Intégration d'Elasticsearch
- Intégration EWS auto-hébergée
- Infrastructure d'automatisation UiPath
- Activités UiPath Marketplace
- Activités officielles UiPath
- Blog
- Comment les machines apprennent à comprendre les mots : guide d'intégration dans NLP
- Apprentissage basé sur des invites avec des Transformers
- Efficient Transformers II : Dilarisation des connaissances et affinement
- Transformateurs efficaces I : mécanismes d'attention
- Modélisation de l'intention hiérarchique profonde non supervisée : obtenir de la valeur sans données d'entraînement
- Correction du biais d'annotation avec Communications Mining
- Apprentissage actif : de meilleurs modèles d'ML en moins de temps
- Tout est dans les chiffres : évaluer les performances du modèle avec des métriques
- Pourquoi la validation du modèle est importante
- Comparaison de Communications Mining et de Google AutoML pour l'intelligence des données conversationnelles
Récupérer des données pour Tableau avec Python
Ce tutoriel explique comment récupérer des données de la plate-forme Communications Mining dans un format adapté à l'importation dans Tableau ou dans une application d'analyse similaire, à l'aide de Python 3.
Autorisations requises pour ce tutoriel
- Afficher les sources
- Afficher les libellés
- Administrateur de flux
- Utiliser des flux
Ce tutoriel vous montrera comment récupérer les données suivantes :
Champs Communications Mining :
- prédiction de libellé pour chaque libellé dans la taxonomie (soit
0
si le libellé n'est pas prédit, soit une valeur comprise entre0.0
et1.0
) - champs généraux correspondants
- Score de qualité de service (si la qualité de service est activée pour l'ensemble de données)
- Score Tone (si Tone est activé pour l'ensemble de données)
Données du commentaire :
- ID
- Commentaire - ID
- ID source
- ID du message de l'e-mail (ID unique fourni par Exchange)
- ID de fil de discussion
- Objet de l’e-mail
- Corps de l'e-mail
- E-mail DE L’expéditeur
- liste des e-mails À destinataires
- liste des destinataires Cc des e-mails
- liste des destinataires Cci des e-mails
- dossier de la boîte aux lettres (où se trouvait l'e-mail au moment de sa synchronisation)
- Nombre de pièces jointes
- liste de noms de pièces jointes
- e-mail Envoyé à l'horodatage
Cette section explique comment récupérer les commentaires de la plate-forme Communications Mining et les convertir en un format adapté à Tableau ou à des applications d'analyse similaires. Veuillez adapter cet exemple à vos besoins spécifiques.
Assurez-vous que vous utilisez Python 3 et que les bibliothèques suivantes sont installées :
urllib3
etrequests
(pour effectuer des requêtes à l'API Communications Mining)pandas
(pour convertir les données en dataframe lors de la dernière étape du tutoriel)
Identifiez les ressources nécessaires pour les étapes suivantes.
- Le point de terminaison de votre API
- Pour les locataires intégrés via UiPath :
https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
- Pour les locataires autonomes :
https://<my_domain>.reinfer.io/api/v1
- Pour les locataires intégrés via UiPath :
- Votre jeton API
- Nom de l'ensemble de données à partir duquel récupérer les commentaires
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
requests
qui sera utilisée pour toutes les requêtes d'API. Il est recommandé de le configurer pour réessayer les requêtes ayant échoué (voir l'exemple).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
DownloadError
en cas d'erreur.
class DownloadError(Exception):
pass
class DownloadError(Exception):
pass
Obtenir la taxonomie du libellé (c'est-à-dire les noms de tous les libellés) du jeu de données de la version du modèle utilisée par le flux. Ceci est nécessaire pour l'étape suivante.
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
Communications Mining renvoie les commentaires en tant qu'objets JSON imbriqués. Pour être utilisés dans Tableau ou dans des applications similaires, les objets JSON imbriqués doivent être convertis en un format tabulaire plus approprié.
comment
de Communications Mining en format tabulaire.
Notez que, étant donné qu'un commentaire peut avoir plusieurs éléments du même champ général, dans cet exemple, toutes les entités correspondantes du même type sont concaténées et placées dans la même colonne.
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
Par défaut, un flux renverra des commentaires plus récents que son heure de création. Au cours du développement, il est souvent nécessaire de réinitialiser le flux pour démarrer à partir d'un moment donné.
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
Un flux fournit des commentaires par lots et garde une trace du dernier commentaire récupéré. Les commentaires sont récupérés à l'aide de la route de récupération du flux, et un lot est reconnu à l'aide de la route avancée du flux. Si un lot n'est pas confirmé, le flux ne passera pas au lot suivant. Par conséquent, le processus de récupération des commentaires à partir de Communications Mining est appelé boucle de récupération-avance.
Définissez une fonction utilitaire qui récupère les commentaires en répétant la boucle de récupération-avance jusqu'à ce que tous les commentaires soient récupérés. À des fins de démonstration, cette fonction stocke tous les commentaires récupérés en mémoire. Dans un scénario de production, ou tout scénario comportant une grande quantité de données, chaque lot de commentaires doit être plutôt transmis à un magasin de données ou ajouté à un fichier.
Étant donné que le flux garde une trace du dernier commentaire récupéré, il est conseillé d'arrêter et de reprendre ce processus en toute sécurité.
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
À ce stade, vous pouvez poursuivre le traitement ou le stockage des données en fonction de vos besoins.
Si vous devez récupérer les mêmes données à nouveau (à des fins de test), le flux doit être réinitialisé.