communications-mining
latest
false
Wichtig :
Dieser Inhalt wurde maschinell übersetzt.
Communications Mining-Entwicklerhandbuch
Last updated 27. Sep. 2024

Abrufen von Daten für Tableau mit Python

In diesem Tutorial wird beschrieben, wie Daten von der Communications Mining-Plattform in ein Format abgerufen werden, das für den Import in Tableau oder eine ähnliche Analyseanwendung geeignet ist, indem Python 3 verwendet wird.

Wichtig:

Berechtigungen, die für dieses Tutorial erforderlich sind

  • Quellen anzeigen
  • Beschriftungen anzeigen
  • Streams-Administrator
  • Streams verbrauchen

Daten in diesem Tutorial

Daten in diesem Tutorial

Dieses Tutorial zeigt, wie die folgenden Daten abgerufen werden:

Communications Mining-Felder:

  • Bezeichnungsvorhersage für jede Bezeichnung in der Taxonomie (entweder 0 , wenn die Bezeichnung nicht vorhergesagt wird, oder ein Wert zwischen 0.0 und 1.0).
  • übereinstimmende allgemeine Felder
  • Dienstgüte-Bewertung (wenn die Dienstgüte für das Dataset aktiviert ist)
  • Stimmungsbewertung (wenn Tone für das Dataset aktiviert ist)

Kommentardaten:

  • IDs
    • Kommentar-ID
    • Quell-ID
    • email Nachrichten-ID (eindeutige ID, die von Exchange bereitgestellt wird)
    • Thread ID
  • E-Mail-Betreff
  • E-Mail-Text
  • email FROM Absender
  • Liste der E-Mail-TO-Empfänger
  • Liste der E-Mail-CC-Empfänger
  • der BCC-Empfänger von E-Mails
  • Postfachordner (in dem sich die E-Mail zum Zeitpunkt der Synchronisierung befand)
  • Anzahl der Anlagen
  • Liste der Namen der Anhänge
  • E-Mail SENT AT Zeitstempel

Python-Beispiel

Python-Beispiel

In diesem Abschnitt wird veranschaulicht, wie Kommentare von der Communications Mining-Plattform abgerufen und in ein Format konvertiert werden, das für Tableau oder ähnliche Analyseanwendungen geeignet ist. Bitte passen Sie dieses Beispiel an Ihre spezifischen Anforderungen an.

Stellen Sie sicher, dass Sie Python 3 verwenden und die folgenden Bibliotheken installiert haben:

  • urllib3 und requests (für Anfragen an die Communications Mining API)
  • pandas (zum Konvertieren der Daten in Dataframe im letzten Schritt des Tutorials)

Einrichten

Identifizieren Sie die für die folgenden Schritte benötigten Ressourcen.

  • Ihr API-Endpunkt
    • Für Mandanten, deren Onboarding über UiPath erfolgt ist: https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • Für eigenständige Mandanten: https://<my_domain>.reinfer.io/api/v1
  • Ihr API-Token
  • Name des Datasets , aus dem Kommentare abgerufen werden sollen
Name des Streams , der zum Abrufen von Kommentaren verwendet werden soll. Sie sollten entweder einen neuen Stream erstellen oder einen vorhandenen Stream verwenden.
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
Erstellen Sie eine requests -Sitzung, die für alle API-Anforderungen verwendet wird. Es wird empfohlen, sie so zu konfigurieren, dass fehlgeschlagene Anforderungen wiederholt werden (siehe Beispiel).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
Die folgenden Schritte lösen bei Fehlern eine DownloadError aus.
class DownloadError(Exception):
    passclass DownloadError(Exception):
    pass

Beschriften Sie die Felder Taxonomie und Allgemein

Rufen Sie die Bezeichnungstaxonomie ab (d. h Namen aller Bezeichnungen) des Datasets für die Modellversion, die vom Stream verwendet wird. Dies wird für den nächsten Schritt benötigt.

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

Kommentarkonvertierung

Communications Mining gibt Kommentare als verschachtelte JSON-Objekte zurück. Zur Verwendung in Tableau oder ähnlichen Anwendungen müssen die geschachtelten JSON-Objekte in ein geeigneteres Tabellenformat konvertiert werden.

Definieren Sie eine Dienstprogrammfunktion, um ein Communications Mining comment -Objekt in ein Tabellenformat zu konvertieren.

Beachten Sie, dass ein Kommentar mehrere gleiche Entitäten desselben allgemeinen Felds haben kann. In diesem Beispiel werden alle übereinstimmenden Entitäten desselben Typs verkettet und in dieselbe Spalte gesetzt.

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dictdef comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

Legen Sie den Startpunkt des Streams fest

Standardmäßig gibt ein Stream Kommentare zurück, die neuer als seine Erstellungszeit sind. Während der Entwicklung ist es oft erforderlich, den Stream zurückzusetzen, um ab einem bestimmten Zeitpunkt zu starten.

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

Abruf-Vorwärts-Schleife

Ein Stream stellt Kommentare in Batches bereit und verfolgt den zuletzt abgerufenen Kommentar. Kommentare werden über die Route des Stream-Abrufs abgerufen und ein Batch wird über die Route des Stream-Advance bestätigt. Wenn ein Batch nicht bestätigt wird, wird der Stream nicht mit der Bereitstellung des nächsten Batches fortfahren. Daher wird der Prozess des Abrufens von Kommentaren aus Communications Mining als Vorab-Abruf-Schleife bezeichnet.

Definieren Sie eine Dienstprogrammfunktion, die Kommentare abruft, indem Sie die abrufende Schleife wiederholen, bis alle Kommentare abgerufen sind. Zu Demonstrationszwecken speichert diese Funktion alle abgerufenen Kommentare im Arbeitsspeicher. In einem Produktionsszenario oder einem Szenario mit großen Datenmengen sollte jeder Batch von Kommentaren in einen Datenspeicher übertragen oder stattdessen an eine Datei angehängt werden.

Da der Stream den zuletzt abgerufenen Kommentar nachverfolgt, kann dieser Prozess sicher angehalten und fortgesetzt werden.

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

An diesem Punkt können Sie mit der Verarbeitung oder Speicherung der Daten entsprechend Ihren Anforderungen fortfahren.

Wenn Sie die gleichen Daten erneut abrufen müssen (zu Testzwecken), muss der Stream zurückgesetzt werden.

War diese Seite hilfreich?

Hilfe erhalten
RPA lernen – Automatisierungskurse
UiPath Community-Forum
Uipath Logo White
Vertrauen und Sicherheit
© 2005–2024 UiPath. Alle Rechte vorbehalten