communications-mining
latest
false
重要 :
このコンテンツの一部は機械翻訳によって処理されており、完全な翻訳を保証するものではありません。
Communications Mining 開発者ガイド
Last updated 2024年10月3日

Python を使用した Tableau のデータのフェッチ

このチュートリアルでは、Python 3 を使用して、Communications Mining プラットフォームから Tableau や同様の分析アプリケーションへのインポートに適した形式にデータを取得する方法について説明します。

重要:

このチュートリアルに必要なアクセス許可

  • ソースを表示
  • ラベルの表示
  • ストリーム管理者
  • ストリームの使用

このチュートリアルのデータ

このチュートリアルのデータ

このチュートリアルでは、次のデータを取得する方法を示します。

Communications Mining のフィールド

  • 分類データ内の各ラベルのラベル予測 (ラベルが予測されない場合は 0 、または 0.0 から 1.0の間の値)
  • 一致した一般フィールド
  • サービス品質スコア (データセットに対してサービス品質が有効になっている場合)
  • トーン スコア (データセットで Tone が有効になっている場合)

コメントデータ:

  • ID
    • コメント ID
    • ソース ID
    • 電子メール メッセージ ID (Exchange が提供する一意の ID)
    • スレッド ID
  • メールの件名
  • メールの本文
  • 送信者からのメール
  • 電子メール TO 受信者のリスト
  • 電子メールCC受信者のリスト
  • 電子メールBCC受信者のリスト
  • メールボックス フォルダー (同期時のメールの場所)
  • 添付ファイルの数
  • 添付ファイル名のリスト
  • メールの送信日時: タイムスタンプ

Python の例

Python の例

このセクションでは、Communications Mining プラットフォームからコメントを取得し、Tableau や同様の分析アプリケーションに適した形式に変換する方法について説明します。 この例を特定の要件に合わせて調整してください。

Python 3 を使用していて、次のライブラリがインストールされていることを確認してください。

  • urllib3requests (Communications Mining API に要求を送信するため)
  • pandas (チュートリアルの最後の手順でデータをデータフレームに変換するため)

セットアップ

次の手順に必要なリソースを特定します。

  • API エンドポイント
    • UiPath を介してオンボードされたテナントの場合: https://cloud.uipath.com/<my_uipath_organisation>/<my_uipath_tenant>/reinfer_/api/v1
    • スタンドアロン テナントの場合: https://<my_domain>.reinfer.io/api/v1
  • API トークン
  • コメントの取得元の データセット の名前
コメントの取得に使用する ストリーム の名前。 新しいストリームを作成するか、既存のストリームを使用する必要があります。
API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"API_ENDPOINT = "YOUR API ENDPOINT"
API_TOKEN = "YOUR API TOKEN"
DATASET_NAME = "project-name/dataset-name"
STREAM_NAME = "stream-name"
すべての API 要求に使用される requests セッションを作成します。 失敗した要求を再試行するように構成することをお勧めします (例を参照)。
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

RETRY_STRATEGY = Retry(
    total=5,
    status_forcelist=[
        HTTPStatus.TOO_MANY_REQUESTS,
        HTTPStatus.BAD_GATEWAY,
        HTTPStatus.GATEWAY_TIMEOUT,
        HTTPStatus.INTERNAL_SERVER_ERROR,
        HTTPStatus.REQUEST_TIMEOUT,
        HTTPStatus.SERVICE_UNAVAILABLE,
],
    allowed_methods=["GET", "POST"],
    backoff_factor=1,
)

adapter = HTTPAdapter(max_retries=RETRY_STRATEGY)
session = Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

session.headers.update({"Authorization": "Bearer " + API_TOKEN})

# If you need to use a proxy to connect to the internet, see# https://requests.readthedocs.io/en/latest/user/advanced/#proxies# on how to configure a proxy for your `requests` session.
次の手順では、エラーが発生した場合に DownloadError が発生します。
class DownloadError(Exception):
    passclass DownloadError(Exception):
    pass

[ラベルの分類] フィールドと [全般] フィールド

ラベルのタクソノミー (つまり、 ストリームで使用されるモデル バージョンのデータセットのすべてのラベルの名前。 これは次の手順で必要になります。

# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()# get model version used by the stream
stream_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}",
)
stream_response_json = stream_response.json()
if not stream_response.ok:
    raise DownloadError(stream_response_json)
model_version = stream_response_json["stream"]["model"]["version"]

# get label taxonomy
model_stats_response = session.get(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/labellers/{model_version}/validation",
)
model_stats_response_json = model_stats_response.json()
if not model_stats_response.ok:
    raise DownloadError(model_stats_response_json)
label_taxonomy = [
    label["name"] for label in model_stats_response_json["validation"]["labels"]
]
entities = [
    entity["name"] for entity in model_stats_response_json["validation"]["entities"]
]

# sort for use in next steps
label_taxonomy.sort()
entities.sort()

コメント変換

Communications Mining では、コメントは入れ子になった JSON オブジェクトとして返されます。 Tableau または同様のアプリケーションで使用するには、入れ子になった JSON オブジェクトを、より適切な表形式に変換する必要があります。

Communications Mining comment オブジェクトを表形式に変換するユーティリティ関数を定義します。

コメントには同じ一般フィールドを複数含めることができるため、この例では、同じタイプの一致するすべてのエンティティが連結され、同じ列に配置されます。

def comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dictdef comment_to_dict(comment, sorted_taxonomy, sorted_entities):

    message = comment["comment"]["messages"][0]  # email fields
    userprops = comment["comment"]["user_properties"]  # comment metadata
    labelprops = {
        prop["property_name"]:prop["value"]
        for prop in comment.get("label_properties", [])
    }  # QOS and Tone scores (if enabled in dataset)
    predictions = {
        " > ".join(prediction["name"]):prediction["probability"]
        for prediction in comment.get("labels", [])
    }
    entities = comment.get("entities", [])
    attachments = comment["comment"].get("attachments", [])
    comment_dict = {
        # comment
        "comment_id": comment["comment"]['id'],
        "comment_uid": comment["comment"]['uid'],
        "source_id": comment["comment"]['source_id'],
        "comment_timestamp": comment["comment"]['timestamp'],
        # email fields
        "email_subject": message.get("subject", {}).get("text"),
        "email_message": message.get("body", {}).get("text"),
        "email_from": message.get("from"),
        "email_to": message.get("to", []),
        "email_cc": message.get("cc", []),
        "email_bcc": message.get("bcc", []),
        "email_sent_at": message.get("sent_at"),
        "email_message_id": userprops.get("string:Message ID"),
        "email_folder": userprops.get("string:Folder"),
        "email_num_attachments": len(attachments),
        "email_attachments": attachments,
        "has_attachments": len(attachments) > 0,
        "total_attachment_size_bytes": sum([item["size"] for item in attachments]),
        "attachment_names": [item["name"] for item in attachments],
        "attachment_types": [item["content_type"] for item in attachments],
        "thread_id": comment["comment"].get('thread_id'),
        # QOS and Tone scores
        "qos_score": labelprops.get("quality_of_service"),
        "tone_score": labelprops.get("tone"),
    }
    for label in sorted_taxonomy:
        comment_dict[label] = predictions.get(label, 0)
    for entity in sorted_entities:
        comment_dict[entity] = ", ".join([
            item["formatted_value"]
            for item in entities if item["name"] == entity])
    return comment_dict

ストリームの開始点を設定

既定では、ストリームは作成時よりも新しいコメントを返します。 開発中は、多くの場合、特定の時点から開始するようにストリームをリセットする必要があります。

STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)STARTING_TIME = "2023-01-03T16:05:00"  # change to required starting time

stream_reset_response = session.post(
    f"{API_ENDPOINT}/datasets/{DATASET_NAME}/streams/{STREAM_NAME}/reset",
    json={
        "to_comment_created_at": STARTING_TIME
    },
)
stream_reset_response_json = stream_reset_response.json()
if not stream_reset_response.ok:
    raise DownloadError(stream_reset_response_json)

フェッチアドバンスのループ

ストリームはコメントをバッチで提供し、最後にフェッチされたコメントを追跡します。 コメントはストリームフェッチルートを使用してフェッチされ、バッチはストリームアドバンスルートを使用して確認されます。 バッチが確認されない場合、ストリームは次のバッチの提供に移行しません。 そのため、Communications Mining からコメントをフェッチするプロセスは、フェッチ/アドバンス ループと呼ばれます。

すべてのコメントがフェッチされるまでフェッチアドバンスループを繰り返して、コメントをフェッチするユーティリティ関数を定義します。 デモンストレーションの目的で、この関数はフェッチされたすべてのコメントをメモリに格納します。 運用シナリオ、または大量のデータを含むシナリオでは、コメントの各バッチをデータ ストアにプッシュするか、代わりにファイルに追加する必要があります。

ストリームは最後にフェッチされたコメントを追跡するため、このプロセスを停止して再開しても安全です。

import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`import pandas as pd

def fetch_comments_from_stream(api_endpoint, dataset_name, stream_name, batch_size, label_taxonomy):
    """Fetch comments until no more comments are available"""

    comment_dicts = []

    while True:
        # fetch BATCH_SIZE comments from stream
        fetch_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/fetch",
            json={
                "size": batch_size,
            },
        )

        # get comments from response
        fetch_response_json = fetch_response.json()
        if not fetch_response.ok:
            raise DownloadError(fetch_response_json)
        comments = fetch_response_json["results"]
        if len(comments) == 0:
            break

        # process comments
        for comment in comments:
            comment_dicts.append(comment_to_dict(comment, label_taxonomy, entities))

        # advance stream using the `sequence_id` from response
        advance_response = session.post(
            f"{api_endpoint}/datasets/{dataset_name}/streams/{stream_name}/advance",
            json={
                "sequence_id": fetch_response_json["sequence_id"],
            },
        )
        advance_response_json = advance_response.json()
        if not advance_response.ok:
            raise DownloadError(advance_response_json)

    return comment_dicts

BATCH_SIZE = 100  # number of comments to fetch in each `fetch` request. max value is 1024.

comment_dicts = fetch_comments_from_stream(
    API_ENDPOINT, DATASET_NAME, STREAM_NAME, BATCH_SIZE, label_taxonomy
)

df = pd.DataFrame.from_records(comment_dicts)

# do something with `df`

この時点で、要件に応じてデータの処理または保存を続行できます。

(テスト目的で) 同じデータを再度取得する必要がある場合は、ストリームをリセットする必要があります。

このページは役に立ちましたか?

サポートを受ける
RPA について学ぶ - オートメーション コース
UiPath コミュニティ フォーラム
Uipath Logo White
信頼とセキュリティ
© 2005-2024 UiPath. All rights reserved.