Process Mining
2023.4
False
横幅背景图像
Process Mining
上次更新日期 2024年4月19日

Editing transformations

Folder structure

流程应用程序的转换包含一个 dbt 项目。 下面是 dbt 项目文件夹内容的说明。

文件夹/文件

包含

dbt_packages\

pm_utils包及其宏。

logs\

运行 dbt时创建的日志。

macros\

自定义宏。

models\

.sql 定义转换的文件。

models\schema\

.yml 定义数据测试的文件。

seed

.csv 具有配置设置的 .csv 文件。

dbt_project.yml

dbt项目的设置。

请参见下图。



数据转换

数据转换在models\目录的.sql文件中定义。 数据转换组织在一组标准的子目录中:
  • 1_input,
  • 2_entities,
  • 3_events,
  • 4_event_logs,
  • 5_business_logic

请参阅转换结构

.sql文件以 Jinja SQL 编写,允许您在普通 SQL 查询中插入 Jinja 语句。 当 dbt 运行所有.sql文件时,每个.sql文件都会在数据库中生成一个新视图或新表。
通常, .sql文件具有以下结构:
  1. With 语句:一个或多个 with 语句,用于包含所需的子表。

    • {{ ref(‘My_table) }}引用由另一个 .sql 文件定义的表 文件。
    • {{ source(var("schema_sources"), 'My_table') }}引用输入表。
  2. 主查询:定义新表的查询。
  3. 最终查询:通常在最后使用Select * from table等查询。 这样可以在调试时轻松进行子选择。
    docs image

有关如何有效编写转换的更多提示,请参阅关于编写 SQL 的提示

Adding source tables

要将新的源表添加到 dbt 项目,该表必须列在models\schema\sources.yml中。 这样,其他模型可以使用{{ source(var("schema_sources"), 'My_table_raw') }}引用它。 有关示例,请参见下图。


重要提示: 必须在sources.yml中列出每个新的源表。
备注:

加载数据时,会将后缀 _raw 添加到源表的表名称中。 例如,名为 my_table 的表应称为 my_table_raw

有关更详细的信息,请参阅有关 来源 的 官方 dbt 文档

Data output

数据转换必须输出相应应用程序所需的数据模型;每个预期的表格和字段都必须存在。

实际上,这意味着不应删除models\5_business_logic中的表。 此外,不应删除相应查询中的输出字段。

如果要向流程应用程序添加新字段,可以使用可用于流程应用程序的自定义字段。 将转换中的字段映射到自定义字段,以使其在输出中可用。 确保按照流程应用程序的数据模型中的说明在输出中命名自定义字段。

提示:
您可以使用 dbt docs 命令为 dbt 项目生成文档站点,并在默认浏览器中打开该站点。该文档站点还包含一个沿袭图,此图提供了一个实体关系图,以图形方式表示项目中每个数据表之间的沿袭。
有关详细信息,请参阅dbt docs上的官方 dbt 文档

宏可以轻松地重用常见的 SQL 结构。 有关详细信息,请参阅有关Jinja宏的官方 dbt 文档

pm_utils

pm-utils包包含一组通常在 Process Mining 转换中使用的宏。 有关pm_utils宏的更多信息,请参阅ProcessMining-pm-utils
以下是调用pm_utils.optional()宏的 Jinja 代码示例。


种子

种子是用于将数据表添加到转换的csv文件。 有关详细信息,请参阅有关 Jinja 种子的官方 dbt 文档

Process Mining中,这通常用于在转换中轻松配置映射。

编辑种子文件后,这些文件不会立即在数据库中自动更新。 要指示 dbt 将新的种子文件内容加载到数据库中,请运行

  • dbt seed - 仅更新种子文件表,或
  • dbt build - 还将运行所有模型和测试。
    注意: 如果种子文件最初没有数据记录,则数据库中的数据类型可能未正确设置。 要解决此问题,请调用run dbt seed --full-refresh 。 这还将更新数据库中的列集。

Activity configuration

activity_configuration.csv文件用于设置与活动相关的其他字段。 当两个事件在同一时间戳上发生时, activity_order将用作决胜局。 有关示例,请参见下图。


测试

models\schema\文件夹包含一组定义测试的.yml文件。 这些操作将验证预期数据的结构和内容。 有关详细信息,请参阅有关测试的官方 dbt 文档
Process Mining中运行转换时,仅对每次数据提取运行sources.yml中的测试。 这样做是为了检查输入数据的格式是否正确。
注意: 编辑转换时,请确保相应地更新测试。 如果需要,可以删除测试。

自定义吞吐量时间指标

简介

通过自定义数据转换和仪表板编辑,您可以创建和使用自定义吞吐量时间指标。 吞吐量时间是两个活动 A 和 B 之间的时间。下面描述了在编辑转换时创建自定义吞吐量时间指标所需执行的步骤,以及如何在流程应用程序仪表板中启用吞吐量时间指标。

使用编辑转换创建自定义吞吐量时间指标

您必须首先计算吞吐量时间,然后将其作为“ 案例” 字段提供。

计算吞吐时间

您可以根据情况计算 活动 A活动 B之间的吞吐量时间。由于每个用例可能会多次发生活动,因此您需要考虑是使用第一次还是最后一次发生的活动。

  1. 根据 事件日志 创建其他模型,以计算所需的吞吐量时间。 例如, Cases_with_throughput_times

  2. 在此模型中,创建预处理表,在其中定义要用于计算的事件结束。 对于每个表格,您需要“ 案例 ID” 和活动的“ 事件结束 ” 。 以下示例说明了如何为案例选择最后一次出现的活动 A。

    Event_end_activity_A as (
        select
            Event_log."Case_ID",
            max(Event_log."Event_end") as "Event_end_activity_A"
        from Event_log
        where Event_log."Activity" = 'Activity A'
        group by Event_log."Case_ID")Event_end_activity_A as (
        select
            Event_log."Case_ID",
            max(Event_log."Event_end") as "Event_end_activity_A"
        from Event_log
        where Event_log."Activity" = 'Activity A'
        group by Event_log."Case_ID")
    备注:

    在此示例中,如果要选择第一次出现的活动,请将 max 替换为 min

  3. 通过将预处理表联接到 事件日志 并计算实际吞吐量时间来定义吞吐量时间表。

    提示:

    您可以使用 pm-utils 包中提供的datediff函数来计算任意两个事件结束之间的时间差。

    对于 活动 A 先于 活动 B的实例,应以毫秒为单位计算吞吐量时间。毫秒是用于在应用程序模板中定义持续时间的时间单位。 由于已在预处理表中按案例对吞吐量时间进行分组,因此您可以选择任何记录。 在上面的示例中,使用了聚合 最小值 。 可以定义吞吐量时间表,选择 吞吐量时间案例 ID,如下所示。

    Cases_with_throughput_times as (
        select
            Event_log."Case_ID",
            case
                when min(Event_end_activity_A."Event_end_activity_A") <= min(Event_end_activity_B."Event_end_activity_B")
                    then {{ pm_utils.datediff('millisecond',
                    'min(Event_end_activity_A."Event_end_activity_A")',
                    'min(Event_end_activity_B."Event_end_activity_B")') }}
            end as "Throughput_time_activity_A_to_activity_B"
        from Event_log
        left join Event_end_activity_A
            on Event_log."Case_ID" = Event_end_activity_A."Case_ID"
        left join Event_end_activity_B
            on Event_log."Case_ID" = Event_end_activity_B."Case_ID"
        group by Event_log."Case_ID)"Cases_with_throughput_times as (
        select
            Event_log."Case_ID",
            case
                when min(Event_end_activity_A."Event_end_activity_A") <= min(Event_end_activity_B."Event_end_activity_B")
                    then {{ pm_utils.datediff('millisecond',
                    'min(Event_end_activity_A."Event_end_activity_A")',
                    'min(Event_end_activity_B."Event_end_activity_B")') }}
            end as "Throughput_time_activity_A_to_activity_B"
        from Event_log
        left join Event_end_activity_A
            on Event_log."Case_ID" = Event_end_activity_A."Case_ID"
        left join Event_end_activity_B
            on Event_log."Case_ID" = Event_end_activity_B."Case_ID"
        group by Event_log."Case_ID)"

计算吞吐时间 (以天为单位,不包括周末)
您可以使用pm-utils包中提供的date_from_timestamp函数来计算两项活动之间的天数。 此外,您还可借助diff_weekdays函数筛选出周末。

有关如何计算活动 A活动 B之间工作日数的信息,请参阅下面的代码示例。

with Event_log as (
    select * from {{ ref('Event_log') }}
),

Activity_A as (
    select
        Event_log."Case_ID",
        min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_A"
    from Event_log
    where Event_log."Activity" = 'Receive invoice'
    group by Event_log."Case_ID"
),

Activity_B as (
    select
        Event_log."Case_ID",
        min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_B"
    from Event_log
    where Event_log."Activity" = 'Pay invoice'
    group by Event_log."Case_ID"
),

Total_days_minus_weekends as (
    select
        Activity_A."Case_ID",
        Activity_A."Date_activity_A",
        Activity_B."Date_activity_B",
        {{ pm_utils.diff_weekdays('Activity_A."Date_activity_A"', 'Activity_B."Date_activity_B"') }}
    -- Only compute for cases where both dates are known.
    from Activity_A
    inner join Activity_B
        on Activity_A."Case_ID" = Activity_B."Case_ID"
)

select * from Total_days_minus_weekendswith Event_log as (
    select * from {{ ref('Event_log') }}
),

Activity_A as (
    select
        Event_log."Case_ID",
        min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_A"
    from Event_log
    where Event_log."Activity" = 'Receive invoice'
    group by Event_log."Case_ID"
),

Activity_B as (
    select
        Event_log."Case_ID",
        min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_B"
    from Event_log
    where Event_log."Activity" = 'Pay invoice'
    group by Event_log."Case_ID"
),

Total_days_minus_weekends as (
    select
        Activity_A."Case_ID",
        Activity_A."Date_activity_A",
        Activity_B."Date_activity_B",
        {{ pm_utils.diff_weekdays('Activity_A."Date_activity_A"', 'Activity_B."Date_activity_B"') }}
    -- Only compute for cases where both dates are known.
    from Activity_A
    inner join Activity_B
        on Activity_A."Case_ID" = Activity_B."Case_ID"
)

select * from Total_days_minus_weekends
计算吞吐时间 (以天为单位,不包括节假日)

请按照以下步骤计算活动 A活动 B之间的吞吐时间(以天为单位),不包括周末和节假日。

1.创建一个Holidays.csv文件以定义应计为节假日的日期。 该文件应至少包含每个假日的记录。 使用以下格式:
假期

日期

工作日

元首

2024 年 1 月 1 日

复活节

2024 年 3 月 31 日

..

..

..

Holidays.csv文件中的记录用于计算需要从某个日期范围中排除的天数。
2.Holidays.csv文件作为种子文件加载到dbt项目中。 有关详细信息,请参阅有关 Jinja 种子的官方 dbt 文档
3.使用pm-utils包中提供的date_from_timestampdiff_weekdays函数计算吞吐时间(以天为单位,不包括周末),如计算吞吐时间(以天为单位,不包括周末)。
4.计算假期.csv文件中存储的给定日期范围内每个案例的记录数。 请参阅下面的示例代码。
Holidays_count as (
    select
        Total_days_minus_weekends."Case_ID",
        count(Holidays."Date") as "Number_of_holidays"
    from Total_days_minus_weekends
    left join Holidays
        on Holidays."Date" between Total_days_minus_weekends."Date_activity_A" and Total_days_minus_weekends."Date_activity_B"
    where Holidays."Weekday" = 'Yes'
    group by Total_days_minus_weekends."Case_ID"
)Holidays_count as (
    select
        Total_days_minus_weekends."Case_ID",
        count(Holidays."Date") as "Number_of_holidays"
    from Total_days_minus_weekends
    left join Holidays
        on Holidays."Date" between Total_days_minus_weekends."Date_activity_A" and Total_days_minus_weekends."Date_activity_B"
    where Holidays."Weekday" = 'Yes'
    group by Total_days_minus_weekends."Case_ID"
)
备注:
在上面的示例中,当假期是星期六或星期日时,筛选条件“ Weekday = 'Yes' ”用于不减去节假日。 diff_weekday函数已处理此问题。

5.从为每个案例计算的总天数中减去计算的节假日数。 请参阅下面的示例代码。

Total_days_minus_weekends_and_holidays as (
    select
        Total_days_minus_weekends."Case_ID",
        Total_days_minus_weekends."Number_of_days" - Holidays_count."Number_of_holidays" as "Number_of_days_between_dates"
    from Total_days_minus_weekends
    inner join Holidays_count
        on Total_days_minus_weekends."Case_ID" = Holidays_count."Case_ID"
)Total_days_minus_weekends_and_holidays as (
    select
        Total_days_minus_weekends."Case_ID",
        Total_days_minus_weekends."Number_of_days" - Holidays_count."Number_of_holidays" as "Number_of_days_between_dates"
    from Total_days_minus_weekends
    inner join Holidays_count
        on Total_days_minus_weekends."Case_ID" = Holidays_count."Case_ID"
)

将吞吐量时间作为案例字段提供

创建吞吐量时间表后,需要将此表联接到“ 案例 ”表,以将其他吞吐量时间数据添加为案例信息。 要使仪表板中的新吞吐量时间字段可用,有必要将新的吞吐量时间字段转换为自定义案例持续时间字段之一。

替换“ 案例 ” 表格中如下所示的自定义案例持续时间行之一:

{{ pm_utils.optional(ref('Cases_base'), '"custom_case_duration_1"', 'integer') }} as "custom_case_duration_1",{{ pm_utils.optional(ref('Cases_base'), '"custom_case_duration_1"', 'integer') }} as "custom_case_duration_1",

使用新创建的吞吐时间:

Cases_with_throughput_times."Throughput_time_activity_A_to_activity_B" as "custom_case_duration_1",Cases_with_throughput_times."Throughput_time_activity_A_to_activity_B" as "custom_case_duration_1",

自定义吞吐量时间指标的转换更新现已完成,并且可以导入到应用程序模板中。

在流程应用程序仪表板中启用吞吐量时间指标

在转换中创建自定义吞吐时间后,该时间可在应用程序模板中用作其别名下的案例属性。您可以自定义流程应用程序,以根据已在转换中创建的自定义吞吐时间创建吞吐时间指标。

备注:

默认情况下,系统会添加新的自定义持续时间字段作为数字字段。 请务必编辑该字段,并将新字段的“类型”更改为“持续时间”。 另请参阅数据管理器

  • 转到 Data Manager,并创建新指标。
  • 选择要用于吞吐量时间的自定义持续时间字段,然后选择“ 平均值 ”或任何其他所需的聚合。 您还可以在 Data Manager中将自定义案例持续时间字段重命名为所需的名称。
  • 编辑应用程序,并将新指标放置在要向业务用户提供的图表上。
  • 发布仪表板,以在仪表板上提供吞吐时间指标。
备注:

在“ 采购到付款 ”和 “订单到现金” 应用程序模板中,“ Purchase_order_items_with_throughput_times ”和 “Sales_order_items_with_throughput_times”中已经分别提供了吞吐量时间计算。 可以在此处添加自定义吞吐量时间,然后在 Purchase_order_itemsSales_order_items 上将其作为自定义持续时间提供。

SQL differences between Snowflake and SQL Server

SQL Server vs. Snowflake

在本地开发环境中,转换在 SQL Server 上运行,而 Snowflake 在 Process Mining Automation Suite中运行。 尽管大多数 SQL 语句在 SQL Server 和 Snowflake 上都有效,但语法可能略有不同,从而可能导致不同的返回结果。

要编写可在两个数据库系统上运行的 SQL 语句,请执行以下操作:

  • 用双引号将字段名称写入,例如 Table."Field"
  • 防止使用 Snowflake 和 SQL Server 中不同的 SQL 函数,例如 string_agg()listagg()
    pm_utils包附带一组可用于这两种数据库类型的函数,请参阅多个数据库。 例如,如果使用string_agg() listagg()pm_utils.string_agg(),则两个数据库将产生相同的行为,而不是 或 。如果pm_utils不包含所需的函数,则应创建Jinja语句,以确保在每个数据库上调用正确的函数。

字符串连接

要合并为字符串,请使用pm_utils.concat()函数。 这将为 SQL Server 和 Snowflake 生成相同的结果。
示例: pm_utils.concat("This is a nice string", null) = "This is a nice string"不应使用+||等运算符连接字符串,因为这两个数据库的运算符不同(Snowflake 使用|| ,SQL Server 使用+ )。 此外,标准concat()函数在两个系统上的行为也不同:

SQL 服务器

Snowflake

null 值将被忽略并被视为空字符串。
null 值将导致整个结果为null

排序

在 Snowflake 和 SQL Server 中,排序的处理方式有所不同。

示例: ... order by "Attribute_1" desc, "Attribute_2" ...

Null 值

SQL 服务器

Snowflake

null 将默认排在前面 (升序)
null 默认情况下排在最后 (升序)

Handling capital letters

SQL 服务器

Snowflake

大写字母按预期排序 (AaBbCc)

首先按大写字母排序,然后按非大写字母排序 (ABCabc)

虚线

示例: -Accountant-

SQL 服务器

Snowflake

排序时将忽略短划线(因此“-Accountant-”与“Accountant”相同)

短划线将在顶部排序

空格处理

当您按值“A”和“A”进行分组时,这在 SQL Server 中被视为一个值,但在 Snowflake 中被视为两个不同的值。 因此,如果您的数据可能会导致此问题,则建议进行修剪。

区分大小写

默认情况下,SQL Server 不区分大小写,而 Snowflake 区分大小写。 这意味着Table."Field" = "Some_value"Table."Field" = "SOME_VALUE"将在 SQL Server 中返回相同的结果集,但在 Snowflake 中可能会返回两个不同的结果集。

建议您更改本地 SQL Server 数据库的行为以匹配雪花行为,以防止出现任何问题。 这可以通过将数据库排序规则设置为区分大小写的值来实现。

Configuration settings for loading input data

Settings.json

The settings.json file contains settings related to loading input data. These settings are temporary and should be used to switch existing process apps (created before March 2023) to the new data loading behavior. You should not change the settings.json file for new process apps.

创建新的流程应用程序时,请始终确保输入数据所采用的格式符合用于创建新应用程序的应用程序模板的要求。请参阅应用程序模板

重要提示:

默认情况下,新流程应用程序将应用于新的数据模型。 现有的流程应用程序将继续在 Process Mining 2023.4Process Mining 2023.10中运行。 切换之前的 Process Mining 2024.4、 以确保上传数据继续正常工作。 AddRawTablePostfixStripSpecialCharacters 设置是临时设置,将在 Process Mining 2024.4中删除。 从那时起,所有流程应用程序的运行都将如同这些设置设为 false一样。

设置

格式

描述

添加原始表格后缀

boolean

用于在通过“ 上传数据 ” 选项使用文件上传时向源表添加_raw后缀。 例如,如果您上传的文件名为 Event_log.csv ,并且此设置设置为 true,则该文件将更改为Event_log_raw.csv 。
去除特殊字符

boolean

用于删除表格名称和/或字段名称中的特殊字符并用下划线替换空格。例如,名为 Event+end 的字段将更改为 Eventend

将现有流程应用程序与新数据模型设置一起使用

请按照以下步骤操作,将现有流程应用程序与新的数据模型设置一起使用。

  1. 下载settings.json.zip并解压缩文件。

  2. 导出流程应用程序的转换。 请参阅在本地环境中编辑数据转换

  3. Add the settings.json file (if not already present) to transformations.

  4. 确保将 AddRawTablePostfixStripSpecialCharacters 都设置为 false。

  5. 更改输入文件或转换,以使文件名完全匹配。 例如,如果您的输入转换 需要使用 event_log_raw ,则您的 csv 文件也应称为 event_log_raw.csv

  6. 如果您在表格名称或字段名称中使用特殊字符(例如“ ”、“(”或“?”),请确保输入转换使用相同的名称。 例如,字段 Activity 类别在查询中应称为 Activity 类别 ,而不是 Activity_category

  7. 导入转换并提取新数据。

转换将在平台中再次运行,并且源表将相应地更改。

DBT 项目

数据转换用于将输入数据转换为适合Process Mining的数据。 Process Mining中的转换将写入dbt项目。

本页介绍了dbt 。 有关更详细的信息,请参阅官方 dbt 文档

pm-utils package

Process Mining应用程序模板附带一个名为pm_utilsdbt包。 此pm-utils包包含适用于 Process Mining dbt项目的实用程序函数和宏。 有关pm_utils的更多信息,请参阅ProcessMining-pm-utils

更新用于应用程序模板的 pm-utils 版本

UiPath™ 通过添加新函数,持续改进 pm-utils 包。
当发布 pm-utils 包的新版本时,建议您更新转换中使用的版本,以确保使用 pm-utils 包的最新函数和宏。
您可以在 ProcessMining-pm-utils 的“ 版本 ”面板中找到pm-utils 包最新版的版本号。
请按照以下步骤更新转换中的 pm-utils 版本。
  1. 下载pm-utils版本中的源代码 (zip)。
  2. 提取zip文件并将文件夹重命名为pm_utils
  3. 从内联的数据转换编辑器导出转换并提取文件。

  4. 将导出的转换中的pm_utils文件夹替换为新的pm_utils文件夹。

  5. 再次压缩转换的内容,并将其导入到数据转换编辑器中。

另请参阅 。

此页面是否有帮助?

获取您需要的帮助
了解 RPA - 自动化课程
UiPath Community 论坛
Uipath 白色徽标
信任与安全
© 2005-2024 UiPath. All rights reserved.