- API docs
- CLI
- Integration guides
- Blog
- How machines learn to understand words: a guide to embeddings in NLP
- Prompt-based learning with Transformers
- Efficient Transformers II: knowledge distillation & fine-tuning
- Efficient Transformers I: attention mechanisms
- Deep hierarchical unsupervised intent modelling: getting value without training data
- Fixing annotating bias with Communications Mining
- Active learning: better ML models in less time
- It's all in the numbers - assessing model performance with metrics
- Why model validation is important
- Comparing Communications Mining and Google AutoML for conversational data intelligence
Communications Mining Developer Guide
Fetching data for Tableau with Python
This tutorial describes how to fetch data from the Communications Mining platform into a format suitable for importing into Tableau or similar analytics application, using Python 3.
PERMISSIONS REQUIRED FOR THIS TUTORIAL
- View sources
- View labels
- Streams admin
- Consume streams
This tutorial will show how to fetch the following data:
Communications Mining fields:
- label prediction for each label in the taxonomy (either
0
if label is not predicted, or a value between0.0
and1.0
) - matched general fields
- Quality of Service score (if Quality of Service is enabled for the dataset)
- Tone score (if Tone is enabled for the dataset)
Comment data:
- IDs
- comment ID
- source ID
- email Message ID (unique ID provided by Exchange)
- thread ID
- email subject
- email body
- email FROM sender
- list of email TO recipients
- list of email CC recipients
- list of email BCC recipients
- mailbox folder (where the email was at the time it was synced)
- number of attachments
- list of attachment names
- email SENT AT timestamp
This section demonstrates how to fetch comments from the Communications Mining platform and convert them into a format suitable for Tableau or similar analytics applications. Please adapt this example to your specific requirements.
Please ensure you are using Python 3 and have the following libraries installed:
urllib3
andrequests
(for making requests to the Communications Mining API)pandas
(for converting the data to dataframe in the final step of the tutorial)
Identify resources needed for the following steps.
- Your API endpoint
- For tenants onboarded via UiPath:
https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
- For standalone tenants:
https://<my_domain>.reinfer.io/api/v1
- For tenants onboarded via UiPath:
- Your API token
- Name of the dataset from which to fetch comments
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
requests
session which will be used for all API requests. It is recommended that you configure it to retry failed requests (see example).
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus
RETRY_STRATEGY = Retry(
total=5,
status_forcelist=[
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
],
allowed_methods=["GET", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"Authorization": "Bearer " + API_TOKEN})
# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
DownloadError
in case of errors.
class DownloadError(Exception):
pass
class DownloadError(Exception):
pass
Get the label taxonomy (i.e. names of all labels) of the dataset for the model version used by the stream. This is needed for the next step.
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
# get model version used by the stream
stream_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]
# get label taxonomy
model_stats_response = session.get(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
raise DownloadError(model_stats_response_json)
label_taxonomy = [
label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]
# sort for use in next steps
label_taxonomy.sort()
entities.sort()
Communications Mining returns comments as nested JSON objects. For use in Tableau or similar applications, the nested JSON objects need to be converted into a more suitable tabular format.
comment
object to a tabular format.
Note that since a comment can have multiple of the same general field, in this example all matched entities of the same type are concatenated and put in the same column.
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
def comment_to_dict(comment, sorted_taxonomy, sorted_entities):
message = comment["comment"]["messages"][0] # email fields
userprops = comment["comment"]["user_properties"] # comment metadata
labelprops = {
prop["property_name"]:prop["value"]
for prop in comment.get("label_properties", [])
} # QOS and Tone scores (if enabled in dataset)
predictions = {
" > ".join(prediction["name"]):prediction["probability"]
for prediction in comment.get("labels", [])
}
entities = comment.get("entities", [])
attachments = comment["comment"].get("attachments", [])
comment_dict = {
# comment
"comment_id": comment["comment"]['id'],
"comment_uid": comment["comment"]['uid'],
"source_id": comment["comment"]['source_id'],
"comment_timestamp": comment["comment"]['timestamp'],
# email fields
"email_subject": message.get("subject", {}).get("text"),
"email_message": message.get("body", {}).get("text"),
"email_from": message.get("from"),
"email_to": message.get("to", []),
"email_cc": message.get("cc", []),
"email_bcc": message.get("bcc", []),
"email_sent_at": message.get("sent_at"),
"email_message_id": userprops.get("string:Message ID"),
"email_folder": userprops.get("string:Folder"),
"email_num_attachments": len(attachments),
"email_attachments": attachments,
"has_attachments": len(attachments) > 0,
"total_attachment_size_bytes": sum([item["size"] for item in attachments]),
"attachment_names": [item["name"] for item in attachments],
"attachment_types": [item["content_type"] for item in attachments],
"thread_id": comment["comment"].get('thread_id'),
# QOS and Tone scores
"qos_score": labelprops.get("quality_of_service"),
"tone_score": labelprops.get("tone"),
}
for label in sorted_taxonomy:
comment_dict[label] = predictions.get(label, 0)
for entity in sorted_entities:
comment_dict[entity] = ", ".join([
item["formatted_value"]
for item in entities if item["name"] == entity])
return comment_dict
By default a stream will return comments newer than its creation time. During development it is often necessary to reset the stream to start from a specific point in time.
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
STARTING_TIME = "2023-01-03T16:05:00" # change to required starting time
stream_reset_response = session.post(
f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
json={
"to_comment_created_at": STARTING_TIME
},
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
raise DownloadError(stream_reset_response_json)
A stream provides comments in batches and keeps track of the last fetched comment. Comments are fetched using the stream fetch route, and a batch is acknowledged using the stream advance route. If a batch isn't acknowledged, the stream won't move on to providing the next batch. Hence the process of fetching comments from Communications Mining is referred to as the fetch-advance loop.
Define a utility function that fetches comments by repeating the fetch-advance loop until all comments are fetched. For demonstration purposes, this function stores all fetched comments in-memory. In a production scenario, or any scenario with a large quantity of data, each batch of comments should be pushed to a data store or appended to a file instead.
Since the stream keeps track of the last fetched comment, it is safe to stop and resume this process.
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
import pandas as pd
def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
"""Fetch comments until no more comments are available"""
comment_dicts = []
while True:
# fetch BATCH_SIZE comments from stream
fetch_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
json={
"size": batch_size,
},
)
# get comments from response
fetch_response_json = fetch_response.json()
if not fetch_response.ok:
raise DownloadError(fetch_response_json)
comments = fetch_response_json["results"]
if len(comments) == 0:
break
# process comments
for comment in comments:
comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))
# advance stream using the `sequence_id` from response
advance_response = session.post(
f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
json={
"sequence_id": fetch_response_json["sequence_id"],
},
)
advance_response_json = advance_response.json()
if not advance_response.ok:
raise DownloadError(advance_response_json)
return comment_dicts
BATCH_SIZE = 100 # number of comments to fetch in each `fetch` request. max value is 1024.
comment_dicts = fetch_comments_from_stream(
API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)
df = pd.DataFrame.from_records(comment_dicts)
# do something with `df`
At this point you can continue with processing or storing the data according to your requirements.
If you need to fetch the same data again (for testing purposes), the stream needs to be reset.