- API-Dokumentation
- Einleitung
- Verwenden der API
- API-Tutorial
- Zusammenfassung
- Quellen
- Datasets
- Anmerkungen
- Anhänge (Attachments)
- Vorhersagen
- Erstellen Sie einen Stream
- Aktualisieren Sie einen Stream
- Rufen Sie einen Stream nach Namen ab
- Rufen Sie alle Streams ab
- Löschen Sie einen Stream
- Ergebnisse aus Stream abrufen
- Kommentare aus einem Stream abrufen (Legacy)
- Bringen Sie einen Stream vor
- Einen Stream zurücksetzen
- Kennzeichnen Sie eine Ausnahme
- Entfernen Sie das Tag einer Ausnahme
- Prüfungsereignisse
- Alle Benutzer abrufen
- CLI
- Integrationsleitfäden
- Exchange Integration mit einem Azure-Dienstbenutzer
- Exchange-Integration mit der Azure-Anwendungsauthentifizierung
- Echtzeit-Automatisierung
- Abrufen von Daten für Tableau mit Python
- Elasticsearch-Integration
- Selbst gehostete EWS-Integration
- UiPath Automatisierungs-Framework
- UiPath Marketplace-Aktivitäten
- offizielle UiPath-Aktivitäten
- Blog
- Wie Maschinen lernen, Wörter zu verstehen: eine Anleitung zu Einbettungen in NLP
- Eingabeaufforderungsbasiertes Lernen mit Transformers
- Ef Robots II: Wissensdegesterration und Feinabstimmung
- Effiziente Transformer I: Warnmechanismen
- Tief hierarchische, nicht überwachte Absichtsmodellierung: Nutzen ohne Trainingsdaten
- Beheben der Anmerkungsverzerrung durch Communications Mining
- Aktives Lernen: Bessere ML-Modelle in weniger Zeit
- Auf Zahlen kommt es an – Bewertung der Modellleistung mit Metriken
- Darum ist Modellvalidierung wichtig
- Vergleich von Communications Mining und Google AutoML für die Ermittlung von Konversationsdaten
Abrufen von Daten für Tableau mit Python
In diesem Tutorial wird beschrieben, wie Daten von der Communications Mining-Plattform in ein Format abgerufen werden, das für den Import in Tableau oder eine ähnliche Analyseanwendung geeignet ist, indem Python 3 verwendet wird.
Berechtigungen, die für dieses Tutorial erforderlich sind
- Quellen anzeigen
- Beschriftungen anzeigen
- Streams-Administrator
- Streams verbrauchen
Dieses Tutorial zeigt, wie die folgenden Daten abgerufen werden:
Communications Mining-Felder:
- Bezeichnungsvorhersage für jede Bezeichnung in der Taxonomie (entweder
0
, wenn die Bezeichnung nicht vorhergesagt wird, oder ein Wert zwischen0.0
und1.0
). - übereinstimmende allgemeine Felder
- Dienstgüte-Bewertung (wenn die Dienstgüte für das Dataset aktiviert ist)
- Stimmungsbewertung (wenn Tone für das Dataset aktiviert ist)
Kommentardaten:
- IDs
- Kommentar-ID
- Quell-ID
- email Nachrichten-ID (eindeutige ID, die von Exchange bereitgestellt wird)
- Thread ID
- E-Mail-Betreff
- E-Mail-Text
- email FROM Absender
- Liste der E-Mail-TO-Empfänger
- Liste der E-Mail-CC-Empfänger
- der BCC-Empfänger von E-Mails
- Postfachordner (in dem sich die E-Mail zum Zeitpunkt der Synchronisierung befand)
- Anzahl der Anlagen
- Liste der Namen der Anhänge
- E-Mail SENT AT Zeitstempel
In diesem Abschnitt wird veranschaulicht, wie Kommentare von der Communications Mining-Plattform abgerufen und in ein Format konvertiert werden, das für Tableau oder ähnliche Analyseanwendungen geeignet ist. Bitte passen Sie dieses Beispiel an Ihre spezifischen Anforderungen an.
Stellen Sie sicher, dass Sie Python 3 verwenden und die folgenden Bibliotheken installiert haben:
urllib3
undrequests
(für Anfragen an die Communications Mining API)pandas
(zum Konvertieren der Daten in Dataframe im letzten Schritt des Tutorials)
Identifizieren Sie die für die folgenden Schritte benötigten Ressourcen.
- Ihr API-Endpunkt
- Für Mandanten, deren Onboarding über UiPath erfolgt ist:
https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
- Für eigenständige Mandanten:
https://<my_domain>.reinfer.io/api/v1
- Für Mandanten, deren Onboarding über UiPath erfolgt ist:
- Ihr API-Token
- Name des Datasets , aus dem Kommentare abgerufen werden sollen
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
requests
-Sitzung, die für alle API-Anforderungen verwendet wird. Es wird empfohlen, sie so zu konfigurieren, dass fehlgeschlagene Anforderungen wiederholt werden (siehe Beispiel).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
DownloadError
aus.
class DownloadError(Exception):
pass
class DownloadError(Exception):
pass
Rufen Sie die Bezeichnungstaxonomie ab (d. h Namen aller Bezeichnungen) des Datasets für die Modellversion, die vom Stream verwendet wird. Dies wird für den nächsten Schritt benötigt.
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
Communications Mining gibt Kommentare als verschachtelte JSON-Objekte zurück. Zur Verwendung in Tableau oder ähnlichen Anwendungen müssen die geschachtelten JSON-Objekte in ein geeigneteres Tabellenformat konvertiert werden.
comment
-Objekt in ein Tabellenformat zu konvertieren.
Beachten Sie, dass ein Kommentar mehrere gleiche Entitäten desselben allgemeinen Felds haben kann. In diesem Beispiel werden alle übereinstimmenden Entitäten desselben Typs verkettet und in dieselbe Spalte gesetzt.
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
Standardmäßig gibt ein Stream Kommentare zurück, die neuer als seine Erstellungszeit sind. Während der Entwicklung ist es oft erforderlich, den Stream zurückzusetzen, um ab einem bestimmten Zeitpunkt zu starten.
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
Ein Stream stellt Kommentare in Batches bereit und verfolgt den zuletzt abgerufenen Kommentar. Kommentare werden über die Route des Stream-Abrufs abgerufen und ein Batch wird über die Route des Stream-Advance bestätigt. Wenn ein Batch nicht bestätigt wird, wird der Stream nicht mit der Bereitstellung des nächsten Batches fortfahren. Daher wird der Prozess des Abrufens von Kommentaren aus Communications Mining als Vorab-Abruf-Schleife bezeichnet.
Definieren Sie eine Dienstprogrammfunktion, die Kommentare abruft, indem Sie die abrufende Schleife wiederholen, bis alle Kommentare abgerufen sind. Zu Demonstrationszwecken speichert diese Funktion alle abgerufenen Kommentare im Arbeitsspeicher. In einem Produktionsszenario oder einem Szenario mit großen Datenmengen sollte jeder Batch von Kommentaren in einen Datenspeicher übertragen oder stattdessen an eine Datei angehängt werden.
Da der Stream den zuletzt abgerufenen Kommentar nachverfolgt, kann dieser Prozess sicher angehalten und fortgesetzt werden.
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
An diesem Punkt können Sie mit der Verarbeitung oder Speicherung der Daten entsprechend Ihren Anforderungen fortfahren.
Wenn Sie die gleichen Daten erneut abrufen müssen (zu Testzwecken), muss der Stream zurückgesetzt werden.