- 发行说明
- 在开始之前
- 入门指南
- 集成
- 管理访问权限
- 使用流程应用程序
- 创建应用程序
- 正在加载数据
- 自定义流程应用程序
- 数据转换
- TemplateOne 应用程序模板
- “购买到付款”应用程序模板
- “订单到现金”应用模板
- Basic troubleshooting guide
Process Mining
Editing transformations
流程应用程序的转换包含一个 dbt 项目。 下面是 dbt 项目文件夹内容的说明。
文件夹/文件 |
包含 |
---|---|
|
pm_utils 包及其宏。
|
|
运行 dbt时创建的日志。 |
|
自定义宏。 |
|
.sql 文件,用于定义转换。
|
|
.yml 文件,用于定义数据测试。
|
|
.csv 包含配置设置的文件。
|
|
dbt项目的设置。 |
请参见下图。
models\
目录的.sql
文件中定义。 数据转换组织在一组标准的子目录中:
1_input
,2_entities
,3_events
,4_event_logs
,5_business_logic
。
请参阅转换结构。
.sql
文件以 Jinja SQL 编写,允许您在普通 SQL 查询中插入 Jinja 语句。 当 dbt 运行所有.sql
文件时,每个.sql
文件都会在数据库中生成一个新视图或新表。
.sql
文件具有以下结构:
-
With 语句:一个或多个 with 语句,用于包含所需的子表。
{{ ref(‘My_table) }}
引用由另一个 .sql 文件定义的表 文件。{{ source(var("schema_sources"), 'My_table') }}
引用输入表。
- 主查询:定义新表的查询。
-
最终查询:通常在最后使用
Select * from table
等查询。 这样可以在调试时轻松进行子选择。
有关如何有效编写转换的更多提示,请参阅关于编写 SQL 的提示
models\schema\sources.yml
中。 这样,其他模型可以使用{{ source(var("schema_sources"), 'My_table_raw') }}
引用它。 有关示例,请参见下图。
sources.yml
中列出每个新的源表。
加载数据时,会将后缀 _raw 添加到源表的表名称中。 例如,名为 my_table 的表应称为 my_table_raw。
有关更多详细信息,请参阅有关来源的 官方 dbt 文档 。
数据转换必须输出相应应用程序所需的数据模型;每个预期的表格和字段都必须存在。
models\5_business_logic
中的表。 此外,不应删除相应查询中的输出字段。
如果要向流程应用程序添加新字段,可以使用可用于流程应用程序的自定义字段。 将转换中的字段映射到自定义字段,以使其在输出中可用。 确保按照流程应用程序的数据模型中的说明在输出中命名自定义字段。
dbt docs
命令为 dbt 项目生成文档站点,并在默认浏览器中打开该站点。该文档站点还包含一个沿袭图,此图提供了一个实体关系图,以图形方式表示项目中每个数据表之间的沿袭。
dbt docs
上的官方 dbt 文档。
宏使重用常见 SQL 结构变得容易。 有关详细信息,请参阅有关 Jinja 宏的 官方 dbt 文档 。
pm_utils
pm_utils.optional()
宏的 Jinja 代码示例。
csv
文件。 有关详细信息,请参阅有关 Jinja 种子的官方 dbt 文档。
在 Process Mining中,这通常用于在转换中轻松配置映射。
编辑种子文件后,这些文件不会立即在数据库中自动更新。 要指示 dbt 将新的种子文件内容加载到数据库中,请运行
dbt seed
- 仅更新种子文件表,或-
dbt build
- 还将运行所有模型和测试。注意: 如果种子文件最初没有数据记录,则数据库中的数据类型可能未正确设置。 要解决此问题,请调用run dbt seed --full-refresh
。 这还将更新数据库中的列集。
activity_configuration.csv
文件用于设置与活动相关的其他字段。 当两个事件在同一时间戳上发生时, activity_order
将用作决胜局。 有关示例,请参见下图。
sources.yml
中的测试。 这样做是为了检查输入数据的格式是否正确。
简介
通过自定义数据转换和仪表板编辑,您可以创建和使用自定义吞吐量时间指标。 吞吐量时间是两个活动 A 和 B 之间的时间。下面描述了在编辑转换时创建自定义吞吐量时间指标所需执行的步骤,以及如何在流程应用程序仪表板中启用吞吐量时间指标。
使用编辑转换创建自定义吞吐量时间指标
您必须首先计算吞吐量时间,然后将其作为“ 案例” 字段提供。
计算吞吐时间
您可以根据情况计算 活动 A 和 活动 B之间的吞吐量时间。由于每个用例可能会多次发生活动,因此您需要考虑是使用第一次还是最后一次发生的活动。
-
根据 事件日志 创建其他模型,以计算所需的吞吐量时间。 例如, Cases_with_throughput_times。
-
在此模型中,创建预处理表,在其中定义要用于计算的事件结束。 对于每个表格,您需要“ 案例 ID” 和活动的“ 事件结束 ” 。 以下示例说明了如何为案例选择最后一次出现的活动 A。
Event_end_activity_A as ( select Event_log."Case_ID", max(Event_log."Event_end") as "Event_end_activity_A" from Event_log where Event_log."Activity" = 'Activity A' group by Event_log."Case_ID")
Event_end_activity_A as ( select Event_log."Case_ID", max(Event_log."Event_end") as "Event_end_activity_A" from Event_log where Event_log."Activity" = 'Activity A' group by Event_log."Case_ID")备注:在此示例中,如果要选择第一次出现的活动,请将 max 替换为 min。
-
通过将预处理表联接到 事件日志 并计算实际吞吐量时间来定义吞吐量时间表。
提示:您可以使用 pm-utils 包中提供的datediff函数来计算任意两个事件结束之间的时间差。
对于 活动 A 先于 活动 B的实例,应以毫秒为单位计算吞吐量时间。毫秒是用于在应用程序模板中定义持续时间的时间单位。 由于已在预处理表中按案例对吞吐量时间进行分组,因此您可以选择任何记录。 在上面的示例中,使用了聚合 最小值 。 可以定义吞吐量时间表,选择 吞吐量时间 和 案例 ID,如下所示。
Cases_with_throughput_times as ( select Event_log."Case_ID", case when min(Event_end_activity_A."Event_end_activity_A") <= min(Event_end_activity_B."Event_end_activity_B") then {{ pm_utils.datediff('millisecond', 'min(Event_end_activity_A."Event_end_activity_A")', 'min(Event_end_activity_B."Event_end_activity_B")') }} end as "Throughput_time_activity_A_to_activity_B" from Event_log left join Event_end_activity_A on Event_log."Case_ID" = Event_end_activity_A."Case_ID" left join Event_end_activity_B on Event_log."Case_ID" = Event_end_activity_B."Case_ID" group by Event_log."Case_ID)"
Cases_with_throughput_times as ( select Event_log."Case_ID", case when min(Event_end_activity_A."Event_end_activity_A") <= min(Event_end_activity_B."Event_end_activity_B") then {{ pm_utils.datediff('millisecond', 'min(Event_end_activity_A."Event_end_activity_A")', 'min(Event_end_activity_B."Event_end_activity_B")') }} end as "Throughput_time_activity_A_to_activity_B" from Event_log left join Event_end_activity_A on Event_log."Case_ID" = Event_end_activity_A."Case_ID" left join Event_end_activity_B on Event_log."Case_ID" = Event_end_activity_B."Case_ID" group by Event_log."Case_ID)"
计算吞吐时间 (以天为单位,不包括周末)
date_from_timestamp
函数来计算两项活动之间的天数。 此外,您还可借助diff_weekdays
函数筛选出周末。
有关如何计算活动 A和活动 B之间工作日数的信息,请参阅下面的代码示例。
with Event_log as (
select * from {{ ref('Event_log') }}
),
Activity_A as (
select
Event_log."Case_ID",
min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_A"
from Event_log
where Event_log."Activity" = 'Receive invoice'
group by Event_log."Case_ID"
),
Activity_B as (
select
Event_log."Case_ID",
min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_B"
from Event_log
where Event_log."Activity" = 'Pay invoice'
group by Event_log."Case_ID"
),
Total_days_minus_weekends as (
select
Activity_A."Case_ID",
Activity_A."Date_activity_A",
Activity_B."Date_activity_B",
{{ pm_utils.diff_weekdays('Activity_A."Date_activity_A"', 'Activity_B."Date_activity_B"') }}
-- Only compute for cases where both dates are known.
from Activity_A
inner join Activity_B
on Activity_A."Case_ID" = Activity_B."Case_ID"
)
select * from Total_days_minus_weekends
with Event_log as (
select * from {{ ref('Event_log') }}
),
Activity_A as (
select
Event_log."Case_ID",
min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_A"
from Event_log
where Event_log."Activity" = 'Receive invoice'
group by Event_log."Case_ID"
),
Activity_B as (
select
Event_log."Case_ID",
min({{ pm_utils.date_from_timestamp('Event_log."Event_end"') }}) as "Date_activity_B"
from Event_log
where Event_log."Activity" = 'Pay invoice'
group by Event_log."Case_ID"
),
Total_days_minus_weekends as (
select
Activity_A."Case_ID",
Activity_A."Date_activity_A",
Activity_B."Date_activity_B",
{{ pm_utils.diff_weekdays('Activity_A."Date_activity_A"', 'Activity_B."Date_activity_B"') }}
-- Only compute for cases where both dates are known.
from Activity_A
inner join Activity_B
on Activity_A."Case_ID" = Activity_B."Case_ID"
)
select * from Total_days_minus_weekends
计算吞吐时间 (以天为单位,不包括节假日)
请按照以下步骤计算活动 A和活动 B之间的吞吐时间(以天为单位),不包括周末和节假日。
Holidays.csv
文件以定义应计为节假日的日期。 该文件应至少包含每个假日的记录。 使用以下格式:
假期 |
日期 |
工作日 |
元首 |
2024 年 1 月 1 日 |
是 |
复活节 |
2024 年 3 月 31 日 |
否 |
.. |
.. |
.. |
Holidays.csv
文件中的记录用于计算需要从某个日期范围中排除的天数。
Holidays.csv
文件作为种子文件加载到dbt项目中。 有关详细信息,请参阅有关 Jinja 种子的官方 dbt 文档。
date_from_timestamp
和diff_weekdays
函数计算吞吐时间(以天为单位,不包括周末),如计算吞吐时间(以天为单位,不包括周末)。
.csv
文件中存储的给定日期范围内每个案例的记录数。 请参阅下面的示例代码。
Holidays_count as (
select
Total_days_minus_weekends."Case_ID",
count(Holidays."Date") as "Number_of_holidays"
from Total_days_minus_weekends
left join Holidays
on Holidays."Date" between Total_days_minus_weekends."Date_activity_A" and Total_days_minus_weekends."Date_activity_B"
where Holidays."Weekday" = 'Yes'
group by Total_days_minus_weekends."Case_ID"
)
Holidays_count as (
select
Total_days_minus_weekends."Case_ID",
count(Holidays."Date") as "Number_of_holidays"
from Total_days_minus_weekends
left join Holidays
on Holidays."Date" between Total_days_minus_weekends."Date_activity_A" and Total_days_minus_weekends."Date_activity_B"
where Holidays."Weekday" = 'Yes'
group by Total_days_minus_weekends."Case_ID"
)
Weekday = 'Yes'
”用于不减去节假日。 diff_weekday
函数已处理此问题。
5.从为每个案例计算的总天数中减去计算的节假日数。 请参阅下面的示例代码。
Total_days_minus_weekends_and_holidays as (
select
Total_days_minus_weekends."Case_ID",
Total_days_minus_weekends."Number_of_days" - Holidays_count."Number_of_holidays" as "Number_of_days_between_dates"
from Total_days_minus_weekends
inner join Holidays_count
on Total_days_minus_weekends."Case_ID" = Holidays_count."Case_ID"
)
Total_days_minus_weekends_and_holidays as (
select
Total_days_minus_weekends."Case_ID",
Total_days_minus_weekends."Number_of_days" - Holidays_count."Number_of_holidays" as "Number_of_days_between_dates"
from Total_days_minus_weekends
inner join Holidays_count
on Total_days_minus_weekends."Case_ID" = Holidays_count."Case_ID"
)
将吞吐量时间作为案例字段提供
创建吞吐量时间表后,需要将此表联接到“ 案例 ”表,以将其他吞吐量时间数据添加为案例信息。 要使仪表板中的新吞吐量时间字段可用,有必要将新的吞吐量时间字段转换为自定义案例持续时间字段之一。
替换“ 案例 ” 表格中如下所示的自定义案例持续时间行之一:
{{ pm_utils.optional(ref('Cases_base'), '"custom_case_duration_1"', 'integer') }} as "custom_case_duration_1",
{{ pm_utils.optional(ref('Cases_base'), '"custom_case_duration_1"', 'integer') }} as "custom_case_duration_1",
使用新创建的吞吐时间:
Cases_with_throughput_times."Throughput_time_activity_A_to_activity_B" as "custom_case_duration_1",
Cases_with_throughput_times."Throughput_time_activity_A_to_activity_B" as "custom_case_duration_1",
自定义吞吐量时间指标的转换更新现已完成,并且可以导入到应用程序模板中。
在流程应用程序仪表板中启用吞吐量时间指标
在转换中创建自定义吞吐时间后,该时间可在应用程序模板中用作其别名下的案例属性。您可以自定义流程应用程序,以根据已在转换中创建的自定义吞吐时间创建吞吐时间指标。
默认情况下,系统会添加新的自定义持续时间字段作为数字字段。 请务必编辑该字段,并将新字段的“类型”更改为“持续时间”。 另请参阅数据管理器。
- 转到 Data Manager,并创建新指标。
- 选择要用于吞吐量时间的自定义持续时间字段,然后选择“ 平均值 ”或任何其他所需的聚合。 您还可以在 Data Manager中将自定义案例持续时间字段重命名为所需的名称。
- 编辑应用程序,并将新指标放置在要向业务用户提供的图表上。
- 发布仪表板,以在仪表板上提供吞吐时间指标。
在“ 采购到付款 ”和 “订单到现金” 应用程序模板中,“ Purchase_order_items_with_throughput_times ”和 “Sales_order_items_with_throughput_times”中已经分别提供了吞吐量时间计算。 可以在此处添加自定义吞吐量时间,然后在 Purchase_order_items 或 Sales_order_items 上将其作为自定义持续时间提供。
SQL Server vs. Snowflake
在本地开发环境中,转换在 SQL Server 上运行,而 Snowflake 在 Process Mining Automation Suite中运行。 尽管大多数 SQL 语句在 SQL Server 和 Snowflake 上都有效,但语法可能略有不同,从而可能导致不同的返回结果。
要编写可在两个数据库系统上运行的 SQL 语句,请执行以下操作:
- 用双引号将字段名称写入,例如
Table."Field"
。 -
防止使用 Snowflake 和 SQL Server 中不同的 SQL 函数,例如
string_agg()
和listagg()
。pm_utils
包附带一组适用于这两种数据库类型的函数,请参阅 多个数据库。 例如,不使用string_agg()
或listagg()
,pm_utils.string_agg()
将导致两个数据库的行为相同。 如果pm_utils
不包含所需的函数,则应创建 Jinja 语句,以确保在每个数据库上调用正确的函数。
字符串连接
pm_utils.concat()
函数。 这将为 SQL Server 和 Snowflake 生成相同的结果。
pm_utils.concat("This is a nice string", null)
= "This is a nice string"
不应使用+
或||
等运算符连接字符串,因为这两个数据库的运算符不同(Snowflake 使用||
,SQL Server 使用+
)。 此外,标准concat()
函数在两个系统上的行为也不同:
SQL 服务器 |
Snowflake |
---|---|
null 值将被忽略,并视为空字符串。
|
null 值将导致整个结果为null 。
|
排序
在 Snowflake 和 SQL Server 中,排序的处理方式有所不同。
... order by "Attribute_1" desc, "Attribute_2" ...
。Null 值
SQL 服务器 |
Snowflake |
---|---|
默认情况下,
null 首先排序(升序)
|
默认情况下,
null 将排在最后(升序)
|
Handling capital letters
SQL 服务器 |
Snowflake |
---|---|
大写字母按预期排序 (AaBbCc) |
首先按大写字母排序,然后按非大写字母排序 (ABCabc) |
虚线
-Accountant-
。
SQL 服务器 |
Snowflake |
---|---|
排序时将忽略短划线(因此“-Accountant-”与“Accountant”相同) |
短划线将在顶部排序 |
空格处理
当您按值“A”和“A”进行分组时,这在 SQL Server 中被视为一个值,但在 Snowflake 中被视为两个不同的值。 因此,如果您的数据可能会导致此问题,则建议进行修剪。
区分大小写
Table."Field" = "Some_value"
和Table."Field" = "SOME_VALUE"
将在 SQL Server 中返回相同的结果集,但在 Snowflake 中可能会返回两个不同的结果集。
建议您更改本地 SQL Server 数据库的行为以匹配雪花行为,以防止出现任何问题。 这可以通过将数据库排序规则设置为区分大小写的值来实现。
Settings.json
The settings.json file contains settings related to loading input data. These settings are temporary and should be used to switch existing process apps (created before March 2023) to the new data loading behavior. You should not change the settings.json file for new process apps.
创建新的流程应用程序时,请始终确保输入数据所采用的格式符合用于创建新应用程序的应用程序模板的要求。请参阅应用程序模板。
默认情况下,新流程应用程序将应用于新的数据模型。 现有的流程应用程序将继续在 Process Mining 2023.4 和 Process Mining 2023.10中运行。 切换之前的 Process Mining 2024.4、 以确保上传数据继续正常工作。 AddRawTablePostfix 和 StripSpecialCharacters 设置是临时设置,将在 Process Mining 2024.4中删除。 从那时起,所有流程应用程序的运行都将如同这些设置设为 false一样。
设置 |
格式 |
描述 |
添加原始表格后缀 |
boolean | 用于在通过“ 上传数据 ” 选项使用文件上传时向源表添加_raw后缀。 例如,如果您上传的文件名为 Event_log.csv ,并且此设置设置为 true,则该文件将更改为Event_log_raw.csv 。 |
去除特殊字符 |
boolean | 用于删除表格名称和/或字段名称中的特殊字符并用下划线替换空格。例如,名为 Event+end 的字段将更改为 Eventend。 |
将现有流程应用程序与新数据模型设置一起使用
请按照以下步骤操作,将现有流程应用程序与新的数据模型设置一起使用。
-
Download the settings.json.zip and unzip the file.
-
导出流程应用程序的转换。 请参阅在本地环境中编辑数据转换。
-
Add the settings.json file (if not already present) to transformations.
-
确保将 AddRawTablePostfix 和 StripSpecialCharacters 都设置为 false。
-
更改输入文件或转换,以使文件名完全匹配。 例如,如果您的输入转换 需要使用 event_log_raw ,则您的 csv 文件也应称为 event_log_raw.csv。
-
如果您在表格名称或字段名称中使用特殊字符(例如“ ”、“(”或“?”),请确保输入转换使用相同的名称。 例如,字段 Activity 类别在查询中应称为 Activity 类别 ,而不是 Activity_category。
-
导入转换并提取新数据。
数据转换用于将输入数据转换为适合Process Mining的数据。 Process Mining中的转换将写入dbt项目。
本页介绍 dbt。 有关更多详细信息,请参阅 官方 dbt 文档。
pm-utils package
Process Mining应用程序模板附带一个名为pm_utils
的dbt包。 此pm-utils
包包含适用于 Process Mining dbt项目的实用程序函数和宏。 有关pm_utils
的更多信息,请参阅ProcessMining-pm-utils 。
更新用于应用程序模板的 pm-utils 版本
pm-utils
包。
pm-utils
包的新版本时,建议您更新转换中使用的版本,以确保使用 pm-utils
包的最新函数和宏。
pm-utils
包的最新版本的版本号。
pm-utils
版本。
-
下载
pm-utils
版本中的源代码 (zip)。 -
提取
zip
文件并将文件夹重命名为pm_utils 。 -
从内联的数据转换编辑器导出转换并提取文件。
-
将导出的转换中的pm_utils文件夹替换为新的pm_utils文件夹。
-
再次压缩转换的内容,并将其导入到数据转换编辑器中。