Communications Mining
Plus récente (Latest)
False
Guide du développeur Communications Mining
Last updated 17 mai 2024

Récupérer des données pour Tableau avec Python

Ce tutoriel explique comment récupérer des données de la plate-forme Communications Mining dans un format adapté à l'importation dans Tableau ou dans une application d'analyse similaire, à l'aide de Python 3.

Important :

Autorisations requises pour ce tutoriel

  • Afficher les sources
  • Afficher les libellés
  • Administrateur de flux
  • Utiliser des flux

Données de ce didacticiel

Données de ce didacticiel

Ce tutoriel vous montrera comment récupérer les données suivantes :

Champs Communications Mining :

  • prédiction de libellé pour chaque libellé dans la taxonomie (soit 0 si le libellé n'est pas prédit, soit une valeur comprise entre 0.0 et 1.0)
  • entités correspondantes
  • Score de qualité de service (si la qualité de service est activée pour l'ensemble de données)
  • Score Tone (si Tone est activé pour l'ensemble de données)

Données du commentaire :

  • ID
    • Commentaire - ID
    • ID source
    • ID du message de l'e-mail (ID unique fourni par Exchange)
    • ID de fil de discussion
  • Objet de l’e-mail
  • Corps de l'e-mail
  • E-mail DE L’expéditeur
  • liste des e-mails À destinataires
  • liste des destinataires Cc des e-mails
  • liste des destinataires Cci des e-mails
  • dossier de la boîte aux lettres (où se trouvait l'e-mail au moment de sa synchronisation)
  • Nombre de pièces jointes
  • liste de noms de pièces jointes
  • e-mail Envoyé à l'horodatage

Exemple Python

Exemple Python

Cette section explique comment récupérer les commentaires de la plate-forme Communications Mining et les convertir en un format adapté à Tableau ou à des applications d'analyse similaires. Veuillez adapter cet exemple à vos besoins spécifiques.

Assurez-vous que vous utilisez Python 3 et que les bibliothèques suivantes sont installées :

  • urllib3 et requests (pour effectuer des requêtes à l'API Communications Mining)
  • pandas (pour convertir les données en dataframe lors de la dernière étape du tutoriel)

Configuration

Identifiez les ressources nécessaires pour les étapes suivantes.

  • Le point de terminaison de votre API
    • Pour les locataires intégrés via UiPath : https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • Pour les locataires autonomes : https://<my_domain>.reinfer.io/api/v1
  • Votre jeton API
  • Nom de l'ensemble de données à partir duquel récupérer les commentaires
Nom du flux à utiliser pour récupérer les commentaires. Vous devez soit créer un nouveau flux, soit utiliser un flux existant.
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
Créez une session requests qui sera utilisée pour toutes les requêtes d'API. Il est recommandé de le configurer pour réessayer les requêtes ayant échoué (voir l'exemple).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
Les étapes suivantes génèrent un DownloadError en cas d'erreur.
class DownloadError(Exception):
passclass DownloadError(Exception):
pass

Taxonomie et entités des libellés

Obtenir la taxonomie du libellé (c'est-à-dire les noms de tous les libellés) du jeu de données de la version du modèle utilisée par le flux. Ceci est nécessaire pour l'étape suivante.

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

Conversion des commentaires

Communications Mining renvoie les commentaires en tant qu'objets JSON imbriqués. Pour être utilisés dans Tableau ou dans des applications similaires, les objets JSON imbriqués doivent être convertis en un format tabulaire plus approprié.

Définissez une fonction utilitaire pour convertir un objet comment de Communications Mining en format tabulaire.

Notez que, étant donné qu'un commentaire peut avoir plusieurs entités de la même entité, dans cet exemple, toutes les entités correspondantes du même type sont concaténées et placées dans la même colonne.

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dictdef comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

Définir le point de départ du flux

Par défaut, un flux renverra des commentaires plus récents que son heure de création. Au cours du développement, il est souvent nécessaire de réinitialiser le flux pour démarrer à partir d'un moment donné.

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

Boucle Récupérer-Avancer

Un flux fournit des commentaires par lots et garde une trace du dernier commentaire récupéré. Les commentaires sont récupérés à l'aide de la route de récupération du flux, et un lot est reconnu à l'aide de la route avancée du flux. Si un lot n'est pas confirmé, le flux ne passera pas au lot suivant. Par conséquent, le processus de récupération des commentaires à partir de Communications Mining est appelé boucle de récupération-avance.

Définissez une fonction utilitaire qui récupère les commentaires en répétant la boucle de récupération-avance jusqu'à ce que tous les commentaires soient récupérés. À des fins de démonstration, cette fonction stocke tous les commentaires récupérés en mémoire. Dans un scénario de production, ou tout scénario comportant une grande quantité de données, chaque lot de commentaires doit être plutôt transmis à un magasin de données ou ajouté à un fichier.

Étant donné que le flux garde une trace du dernier commentaire récupéré, il est conseillé d'arrêter et de reprendre ce processus en toute sécurité.

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

À ce stade, vous pouvez poursuivre le traitement ou le stockage des données en fonction de vos besoins.

Si vous devez récupérer les mêmes données à nouveau (à des fins de test), le flux doit être réinitialisé.

Cette page vous a-t-elle été utile ?

Obtenez l'aide dont vous avez besoin
Formation RPA - Cours d'automatisation
Forum de la communauté UiPath
Logo Uipath blanc
Confiance et sécurité
© 2005-2024 UiPath. All rights reserved.