UiPath Documentation
ixp
latest
false
Wichtig :
Es kann 1–2 Wochen dauern, bis die Lokalisierung neu veröffentlichter Inhalte verfügbar ist.

Communications Mining-Benutzerhandbuch

Abrufen von Daten für Tableau mit Python

In diesem Tutorial wird beschrieben, wie Daten aus der Communications Mining™-Plattform in ein Format abgerufen werden, das für den Import in Tableau oder eine ähnliche Analyseanwendung mit Python 3 geeignet ist.

Wichtig:

Für dieses Tutorial erforderliche Berechtigungen:

  • Quellen anzeigen
  • Beschriftungen anzeigen
  • Streams-Administrator
  • Streams verbrauchen

Daten in diesem Tutorial

Dieses Tutorial zeigt, wie die folgenden Daten abgerufen werden:

Communications Mining™-Felder:

  • Bezeichnungsvorhersage für jede Bezeichnung in der Taxonomie (entweder 0 , wenn die Bezeichnung nicht vorhergesagt wird, oder ein Wert zwischen 0.0 und 1.0).
  • übereinstimmende allgemeine Felder
  • Dienstgüte-Bewertung (wenn die Dienstgüte für das Dataset aktiviert ist)
  • Stimmungsbewertung (wenn Tone für das Dataset aktiviert ist)

Kommentardaten:

  • IDs
    • Kommentar-ID
    • Quell-ID
    • email Nachrichten-ID (eindeutige ID, die von Exchange bereitgestellt wird)
    • Thread ID
  • E-Mail-Betreff
  • E-Mail-Text
  • email FROM Absender
  • Liste der E-Mail-TO-Empfänger
  • Liste der E-Mail-CC-Empfänger
  • der BCC-Empfänger von E-Mails
  • Postfachordner (in dem sich die E-Mail zum Zeitpunkt der Synchronisierung befand)
  • Anzahl der Anlagen
  • Liste der Namen der Anhänge
  • E-Mail SENT AT Zeitstempel

Python-Beispiel

This section demonstrates how to fetch comments from the Communications Mining™ platform and convert them into a format suitable for Tableau or similar analytics applications. Adapt this example to your specific requirements.

Make sure you are using Python 3 and have the following libraries installed:

  • urllib3 und requests (für Anfragen an die Communications Mining API)
  • pandas (zum Konvertieren der Daten in Dataframe im letzten Schritt des Tutorials)

Einrichten

Identify the following resources before you proceed:

  • Ihr API-Endpunkt
    • Für Mandanten mit Onboarding über UiPath®: https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • Für eigenständige Mandanten: https://<my_domain>.reinfer.io/api/v1
  • Ihr API-Token
  • Name des Datasets , aus dem Kommentare abgerufen werden sollen
  • Name of the stream to use for fetching comments. Either create a new stream or use an existing stream.

Assign your resource values to the following constants:

API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"

Erstellen Sie eine requests -Sitzung, die für alle API-Anforderungen verwendet wird. Es wird empfohlen, sie so zu konfigurieren, dass fehlgeschlagene Anforderungen wiederholt werden. Weitere Informationen finden Sie im Beispiel.

from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, check
# https://requests.readthedocs.io/en/latest/user/advanced/#proxies
# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, check
# https://requests.readthedocs.io/en/latest/user/advanced/#proxies
# on how to configure a proxy for your `requests` session.

Define a custom exception class to handle API errors in subsequent steps. All API calls in this tutorial raise a DownloadError when the response is not successful:

class DownloadError(Exception):
    pass
class DownloadError(Exception):
    pass

Use this exception in if not response.ok checks throughout the following code to surface connection and API failures.

Beschriften Sie die Felder Taxonomie und Allgemein

Rufen Sie die Bezeichnungstaxonomie ab (d. h Namen aller Bezeichnungen) des Datasets für die Modellversion, die vom Stream verwendet wird. Dies wird für den nächsten Schritt benötigt.

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

Kommentarkonvertierung

Communications Mining™ gibt Kommentare als verschachtelte JSON-Objekte zurück. Für die Verwendung in Tableau oder ähnlichen Anwendungen müssen die verschachtelten JSON-Objekte in ein geeigneteres Tabellenformat konvertiert werden.

Definieren Sie eine Dienstprogrammfunktion, um ein Communications Mining comment -Objekt in ein Tabellenformat zu konvertieren.

Hinweis:

Da ein Kommentar mehrere desselben allgemeinen Felds haben kann, werden in diesem Beispiel alle übereinstimmenden Entitäten desselben Typs verkettet und in derselben Spalte hinzugefügt.

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

Legen Sie den Startpunkt des Streams fest

Standardmäßig gibt ein Stream Kommentare zurück, die neuer als seine Erstellungszeit sind. Während der Entwicklung ist es oft erforderlich, den Stream zurückzusetzen, um ab einem bestimmten Zeitpunkt zu starten.

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

Abruf-Vorwärts-Schleife

Ein Stream stellt Kommentare in Batches bereit und verfolgt den zuletzt abgerufenen Kommentar. Kommentare werden mithilfe der Stream-Abruf-Route abgerufen und ein Batch wird mithilfe der Stream Advance-Route bestätigt. Wenn ein Batch nicht bestätigt wird, wird der Stream nicht mit der Bereitstellung des nächsten Batches fortfahren. Daher wird der Prozess zum Abrufen von Kommentaren aus Communications Mining™ als Abrufschleife bezeichnet.

Definieren Sie eine Dienstprogrammfunktion, die Kommentare abruft, indem Sie die abrufende Schleife wiederholen, bis alle Kommentare abgerufen sind. Zu Demonstrationszwecken speichert diese Funktion alle abgerufenen Kommentare im Arbeitsspeicher. In einem Produktionsszenario oder einem Szenario mit großen Datenmengen sollte jeder Batch von Kommentaren in einen Datenspeicher übertragen oder stattdessen an eine Datei angehängt werden.

Da der Stream den zuletzt abgerufenen Kommentar nachverfolgt, kann dieser Prozess sicher angehalten und fortgesetzt werden.

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`
import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

An diesem Punkt können Sie mit der Verarbeitung oder Speicherung der Daten entsprechend Ihren Anforderungen fortfahren.

Wenn Sie dieselben Daten erneut abrufen müssen (zu Testzwecken), muss der Stream zurückgesetzt werden .

War diese Seite hilfreich?

Verbinden

Benötigen Sie Hilfe? Support

Möchten Sie lernen? UiPath Academy

Haben Sie Fragen? UiPath-Forum

Auf dem neuesten Stand bleiben