单元 4: 任务流设计


本节将通过以下方式介绍如何设计智能任务流:

  • 基于样例任务流设计可供快速开始的任务流。
  • 新建任务流以熟悉智能任务流的整体功能。


若要了解任务流产品中各类算子的功能和使用方法,可参考算子参考文档

选项 1:基于样例任务流设计任务流

导出样例任务流


通过以下步骤导出样例任务流配置:

  1. 登录 EnOS 管理控制台,从左侧导航栏中选择 智能工作室 > 智能任务流,打开 实验列表 首页。
  2. 选择 样例作业流,找到 wind-power-forecast 并选择 任务流查看。本教程使用的样例任务流在 2.3 更新 1 中进行了更新。
  3. 选择导出 export_icon 下载任务流配置文件。

新建实验


通过以下步骤新建实验:

  1. 登录 EnOS 管理控制台,从左侧导航栏中选择 智能工作室 > 智能任务流,打开 实验列表 首页。
  2. 选择 新建实验,输入实验的名称(winddemo)和描述。
  3. 选择 确定,创建实验并打开实验的 任务流设计 页面,进行任务流的设计和开发。
  4. 选择导入 import_icon 将样例任务流配置导入。

更新全局参数设计


当任务流中有某些参数需经常变动且全局适用时,建议将其作为全局参数:

  • 易于调试。调试代码时,全局参数的配置是全局性的,避免了重复每个算子中的相同参数的重新配置,将显著提升开发效率
  • 易于传参调用。外部调用任务流时可以通过全局参数传递参数给算子使用
  • 易于任务流模板沉淀。将需要变化的参数作为全局参数统一配置,也可保持算子本身设计的稳定性


本教程中使用的全局参数已在样例任务流中配置,可选择画布右侧的任务流设置图标 pipeline_setting 查看。其中每个参数的含义如下表所示:

编号 参数名称 含义
1 resourcepool 用于指定模型部署实例使用的资源池。需要更新为本 OU 下的资源池。
2 datasetname 用于指定使用的数据集名称。本教程中使用的是样例数据集。
3 tasktype

用于控制任务流的分支走向:

  • prediction 表示任务流会走向预测分支
  • training 表示任务流会走向训练分支
4 predictiontype

用于控制任务流的预测类型:

  • file 表示通过 Mlflow 模型文件预测
  • service 表示通过发布的模型服务预测
5 lenoflist 用于指定产生的 looplist 的长度。其值模拟的是实际场景中的风机数量,依据使用场景的不同而有不同定义。
6 modelinstnacename 用于指定训练好的模型部署实例名称。
7 prediction_dataset_name 用于指定训练数据集。
8 sample_split_ratio 用于指定数据集中训练集和预测集的分配比例。
9 hdfs_source 用于指定将模型文件上传的 HDFS 数据源。数据源需先在 数据源连接 中创建。
10 hive_source 用于指定将预测结果上传的 Hive 数据源。数据源需先在 数据源连接 中创建。
11 ouid 当前 OU ID。
12 prediction_instance 用于指定预测任务的实例名称。当训练分支完成后,其值应与 modelinstnacename 一致以便使用训练好的模型进行预测。

主画布设计


主画布中包括以下两个条件逻辑算子:

  • Condition for Training:全局参数 tasktype==training 时执行训练分支任务。
  • Condition for Prediction:全局参数 tasktype==prediction 时执行预测分支任务。

检查训练分支任务


双击 Condition for training 算子可进入其子画布,其中包含以下算子:

  • Recursion for Event Trigger,确定原始数据何时可用于模型训练的 Recursion 算子。在本教程中,其生成的随机数小于5,则表明数据准备就绪。
  • Generate Target List,生成一个循环列表用于后续 ParallelFor 算子循环并发训练任务的 PythonEx 算子。确保 workspacewind_power_forecasting 以及 entrypointgenerate_model_list.py
  • Loop for Model Training,并发执行训练任务的 ParallelFor 算子。双击进入子画布可查看或编辑以下算子。
Operator Name Description
Prepare Training Data 此 PythonEx 算子用于准备训练数据并导出数据文件。确保 workspacewind_power_forecastingentrypointprepare_train_data.py 以及 requirementsrequirements1.txt
Model Training 此 PythonEx 算子用于根据输入数据训练模型并输出训练模型。确保 workspacewind_power_forecastingentrypointtrain_model.py 以及 requirementsrequirements2.txt
Create a Model 此 Model 算子用于创建一个模型。
Create a Model Revision 此 Mlflow Model Version Register 算子用于在指定模型里注册一个 Mlflow 类型的模型版本。
Create a Model Instance 此 Model Instance 算子用于创建一个模型的部署实例。
Create a Model Test Operator 此 Model Test 算子用于测试模型版本是否为可发布成模型服务的有效模型。
Deploy the Model Revision 此 Single Model Deployment 算子可基于已上架的模型版本部署单一模型版本的模型服务(或模型上线),上线成功后,会产生一个可供调用的模型服务。

检查预测分支任务


双击 Condition for Prediction 算子可进入其子画布,其中包含以下算子:

  • Generate Target List for prediction, 产生一个循环列表用于后续 ParallelFor 算子循环并发预测任务的 PythonEx 算子。确保 workspacewind_power_forecasting 以及 requirementsgenerate_model_list.py
  • Loop For prediction,并发执行预测任务的 ParallelFor 算子。Loop For prediction 子画布包含以下 2 个算子:
    • Condition for Service Prediction Type 算子:当全局参数 predictiontype 的参数值为 service 时,基于已发布模型服务执行模型预测任务。
    • Condition for Model File Prediction 算子:当全局参数 predictiontype 的参数值为 file 时,基于 Mlflow 模型文件执行模型预测任务。

Condition for Service Prediction Type 算子


此算子基于已发布的模型服务进行预测。双击可进入子画布,其中包含以下算子:

算子名称 功能描述
Prepare Prediction Data 此 PythonEx 算子用于准备预测使用的数据,通过 result_datafile file 类型的输出参数提供给预测算子使用。确保 workspacewind_power_forecastingentrypointprepare_predict_data.py 以及 requirementsrequirements1.txt
Predict from Service 此 Service Prediction 算子基于模型服务进行预测,通过 predictions file 型参数返回预测结果。
Write results 此 PythonEx 算子用于接收预测算子的预测结果并将结果写入文件。确保 workspacewind_power_forecasting 以及 entrypointwrite_results.py
Check Hive Config 此 PythonCode 算子用于检查当前 OU 下是否有 Hive 库可供预测结果存储。确保 workspacewind_power_forecasting 以及 entrypointcheck_hive_config.py
Export to Hive?
此 Condition 算子在当前 OU 下有 Hive 库时将预测结果保存至 EnOS Hive。双击可进入子画布,其中包含以下算子:
  • Generate Variables,此 PythonEx 算子可生成 HDFS 目录路径以及用于将文件上传至 EnOS Hive 的 SQL 语句。确保 workspacewind_power_forecasting 以及 entrypointgenerate_variables.py
  • HDFS Uploader,此 HDFS Uploader 算子将预测结果上传至指定的 HDFS 目录。
  • Hive,此 Hive 算子将存在 HDFS 中的预测结果写入 Hive 库以便于后续可视化操作。

Condition for Model File Pprediction 算子


此算子基于 Mlflow 模型文件进行预测。双击可进入子画布,其中包含以下算子:

算子名称 功能描述
Get Latest Model Version 此 PythonEx 算子用于获取该模型下的最新模型版本用以预测。确保 workspacewind_power_forecasting 以及 entrypointget_latest_model_version.py
Prepare Prediction Data 2 此 PythonEx 算子用于准备预测使用的数据,通过 result_datafile file 类型的输出参数提供给预测算子使用模型服务进行预测,预测结果通过 predictions file 型参数返回。确保 workspacewind_power_forecastingentrypointprepare_predict_data.py 以及 requirementsrequirements1.txt
Predict from Mlflow Model File 此 Mlflow Model Version Prediction 算子基于 Mlflow 模型文件进行预测,通过 predictions file 类型参数返回预测结果。
Write results 2 此 PythonEx 算子用于接收预测算子的预测结果并将结果写入文件。确保 workspacewind_power_forecasting 以及 entrypointwrite_results.py
Check Hive Config 2 此 PythonCode 算子用于检查当前 OU 下是否有 Hive 库可供预测结果存储。确保 workspacewind_power_forecasting 以及 entrypointcheck_hive_config.py
Export to Hive 2?
此 Condition 算子在当前 OU 下有 Hive 库时将预测结果保存至 EnOS Hive。双击可进入子画布,其中包含以下算子:
  • Generate Variables 2,此 PythonEx 算子可生成 HDFS 目录路径以及用于将文件上传至 EnOS Hive 的 SQL 语句。确保 workspacewind_power_forecasting 以及 entrypointgenerate_variables.py
  • HDFS Uploader 2,此 HDFS Uploader 算子将预测结果上传至指定的 HDFS 目录。
  • Hive 2,此 Hive 算子将存在 HDFS 中的预测结果写入 Hive 库以便于后续可视化操作。

选项 2:新建任务流

新建实验


通过以下步骤新建实验:

  1. 登录 EnOS 管理控制台,从左侧导航栏中选择 智能工作室 > 智能任务流 ,打开 实验列表 首页。
  2. 自建作业流 标签页中选择 新建实验
  3. 在弹出窗口中输入实验的名称(winddemo)和描述。
  4. 选择 确定, 创建实验并打开实验的 任务流设计 页面,进行任务流的设计和开发。

添加全局参数


当任务流中的某些参数需经常变动且全局适用时,建议将其作为全局参数,从而提高配置效率且便于参数管理。


通过以下步骤配置全局参数:

  1. 选择任务流设置图标 pipeline_setting,打开任务流设置页面。
  2. 选择 添加参数,添加以下全局参数:


编号 名称 类型 参数值
1 resourcepool resource_pool 用于指定模型部署实例使用的资源池。在下拉菜单中选择当前 OU 的主资源池。
2 dataset_name string 用于指定使用的数据集。输入 sample-power-forecast 以使用该样例数据集。
3 tasktype string 用于控制任务流的任务类型: 输入 training 进行训练任务,输入 prediction 进行预测任务。此处输入 training ,将预测任务设置为缺省任务类型。
4 predictiontype string 用于控制任务流的预测类型:输入 file 进行 Mlflow 模型文件预测,输入 service 进行模型服务预测。此处输入 service 将模型服务预测设置为缺省预测类型。
5 lenoflist number 用于指定生成 looplist 的长度。其值模拟的是在实际场景中的风机数量。输入 1 ,模拟一个风机的场景。
6 modelinstancename string 用于指定训练好的模型部署实例名称。输入 kmmldsdeployinstance1 指定该模型部署实例。
7 prediction_dataset_name string 用于指定执训练数据集。输入 sample-power-forecast 以使用此数据集训练模型。
8 sample_split_ratio number 用于指定数据集中训练集与预测集的分配比例。 输入 0.75,将75%的数据用于模型训练,25%的数据用于模型预测。
9 hdfs_source hdfs_source 用于指定上传模型文件的 HDFS 数据源,需预先在 数据源连接 中创建。在下拉菜单中选择 HDFS 数据源。
10 hive_source hive_source 用于指定上传预测结果的 Hive 数据源,需预先在 数据源连接 中创建。在下拉菜单中选择 Hive 数据源。
11 ouid string 用于指定所在 OU。 输入当前 OU ID。
12 prediction_instance model_instance 用于指定预测任务的实例名称。当训练分支完成后,其值应与 modelinstnacename 一致以便使用训练好的模型进行预测。输入 kmmldsdeploymentinstance1 指定此预测实例。

主画布设计


主画布中包括两个 Condition 算子:

  • Condition 算子 1:任务类型为“训练”时执行训练分支任务。
  • Condition 算子 2:任务类型为“预测”时执行预测分支任务。


通过以下步骤添加 Condition 算子 1:

  1. 从算子列表中拖拽一个 Condition 算子到主画布中。
  2. 选择此算子,配置以下信息:
    • 名称:输入 Condition for Training
    • 表达式: 配置为 引用 | tasktype | == | 声明| training


通过以下步骤添加 Condition 算子 2:

  1. 从算子列表中拖拽一个 Condition 算子到主画布中。
  2. 选择此算子,配置以下信息:
    • 名称: 输入 Condition for Prediction
    • 表达式:配置为 引用 | tasktype | == | 声明| prediction

配置训练任务


本教程中,训练分支包含数据准备、模型训练、模型版本测试、模型部署等过程。在主画布中,双击 Condition for Training 算子打开子画布,你需要在 Condition for Prediction 子画布中添加 3 个算子:

  • Recursion 算子:确定原始数据何时可用于模型训练。在本教程中,其生成的随机数小于5,则表明数据准备就绪。
  • PythonEx 算子:生成用于后续 ParallelFor 算子循环并发训练的循环列表。
  • ParallelFor 算子:并发执行模型训练任务。


配置准备原始数据的 Recursion 算子


通过以下步骤添加 Recursion 算子:

  1. 从算子列表中拖拽一个 Recursion 算子到 Condition for Training 子画布中。
  2. 双击此 Recursion 算子,进入其子画布。
  3. 从算子列表中拖拽一个 PythonEx 算子到 Recursion 算子的子画布中。
  4. 选择此 PythonEX 算子, 配置以下信息:
    • 名称:输入 Generate a Number between 1-10
    • workplace: 选择 wind_power_forecasting
    • entrypoint: 选择 generate_random_int.py
    • 选择 输出参数 > 添加参数 为算子添加输出参数,并配置为 result_number | number
  5. 选择导航栏上的 Condition for Training 标签,返回 Condition 算子子画布。
  6. 选择此 Recursion 算子,配置以下信息:
    • 名称:输入 Recursion for Event Trigger
    • 表达式:配置为 Reference | Generate a Number between 1-10.result_number | < | Declaration | 5
  7. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。

配置生成循环列表的 PythonEx 算子


通过以下步骤添加 PythonEx 算子:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Training 子画布中。
  2. 连接 Recursion for Event Trigger 算子的输出锚点与该 PythonEx 算子的输入锚点。
  3. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Generate Target List
    • workspace: 选择 wind_power_forecasting
    • entrypoint: 选择 generate_model_list.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 lenoflist | number | Reference | lenoflist
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 result_number | list
  4. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。

配置并发执行模型训练任务的 ParallelFor 算子


通过以下步骤添加 ParallelFor 算子:

  1. 从算子列表中拖拽一个 ParallelFor 算子到 Condition for Training 子画布中。
  2. 连接 Generate Target List 算子的输出锚点与该 ParallelFor 算子的输入锚点。
  3. 选择此 ParallelFor 算子,配置以下信息:
    • 名称:输入 Loop for Model Training
    • 输入参数:配置为 引用 | Generate Target List.result_list | item


Loop For Model Training 算子负责训练模型,并部署训练完成的模型。你需要在 Loop For Model Training 子画布中添加 7 个算子:

  • PythonEx 算子 1:处理训练所需的数据,并输出数据文件。
  • PythonEx 算子 2:基于数据训练模型,并输出训练过的模型。
  • Model 算子:创建一个新模型。
  • Mlflow Model Version Register 算子:在创建的模型里注册一个 Mlflow 类型的模型版本,并导出模型版本文件。
  • Model Instance 算子:创建模型部署实例。
  • Model Test 算子:测试模型版本是否可发布为有效模型。
  • Single Model Deployment 算子:基于已上架的模型版本部署单一模型版本。


通过以下步骤添加 PythonEx 算子 1:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Loop For Model Training 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Prepare Training Data
    • workspace: 选择 wind_power_forecasting
    • entrypoint: 选择 prepare_training_data.py
    • requirement:选择 requirement1.txt
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 datasetname | string | Reference | dataset_name
    • 选择 输入参数 > 添加参数 为算子添加另一输入参数,并配置为 ratio | number | Reference | sample_split_ratio
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 result_datafile | file


通过以下步骤添加 PythonEx 算子 2:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Loop For Model Training 子画布中。
  2. 连接 Prepare Training Data 算子的输出锚点与该 PythonEx 算子的输入锚点。
  3. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Model Training
    • workspace: 选择 wind_power_forecasting
    • entrypoint: 选择 train_model.py
    • requirement:选择 requirement2.txt
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 result_datafile | file | Reference | Prepare Training Data.result_datafile


通过以下步骤添加 Model 算子:

  1. 从算子列表中拖拽一个 Model 算子到 Loop For Model Training 子画布中。

  2. 选择此 Model 算子,配置以下信息:

    • 名称:输入 Create a Model

    • category: 选择 model-predictor

    • model_name: 引用 item

    • input_data_type:选择 Text

    • scope:选择 private

    • technique:选择 matching

    • usecase:选择 Light field

    • publisher:选择 system

    • 为 input_format 参数输入以下代码:

      [   {      "annotations": "",      "defaultValue": 300,      "dtype": "int",      "ftype": "continuous",      "name": "i.set",      "range": [0, 440],      "repeat": 0      },{      "annotations": "",      "defaultValue": 300,      "dtype": "int",      "ftype": "continuous",      "name": "X_basic.forecast_time",      "range": [0, 440],      "repeat": 0},{      "annotations": "",      "defaultValue": 300,      "dtype": "int",      "ftype": "continuous",      "name": "X_basic.horizon",      "range": [0, 440],      "repeat": 0},{      "annotations": "",      "defaultValue": 8,      "dtype": "int",      "ftype": "continuous",      "name": "X-basic.time",      "range": [0, 49],      "repeat": 0},{      "annotations": "",      "defaultValue": 10,      "dtype": "int",      "ftype": "continuous",      "name": "X-basic.hour",      "range": [0, 23],      "repeat": 0},{      "annotations": "",      "defaultValue": 10,      "dtype": "int",      "ftype": "continuous",      "name": "EC.nwp_time",      "range": [0, 23],      "repeat": 0},{      "annotations": "",      "defaultValue": 1.5,      "dtype": "float",      "ftype": "continuous",      "name": "EC.dist",      "range": [1, 2],      "repeat": 0},{      "annotations": "",      "defaultValue": 1.5,      "dtype": "float",      "ftype": "continuous",      "name": "EC.ws",      "range": [1, 2],      "repeat": 0},{      "annotations": "",      "defaultValue": 250,      "dtype": "float",      "ftype": "continuous",      "name": "EC.wd",      "range": [240, 300],      "repeat": 0},{      "annotations": "",      "defaultValue": 1,      "dtype": "float",      "ftype": "continuous",      "name": "EC.rho",      "range": [1, 2],      "repeat": 0},{      "annotations": "",      "defaultValue": 850,      "dtype": "float",      "ftype": "continuous",      "name": "EC.pres",      "range": [820, 900],      "repeat": 0},{      "annotations": "",      "defaultValue": 20,      "dtype": "float",      ftype": "continuous",      "name": "EC.tmp",      "range": [18, 30],      "repeat": 0},{      "annotations": "",      "defaultValue": 1,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.nwp_time",      range": [1, 2],      "repeat": 0},{      "annotations": "",      "defaultValue": 20,      "dtype": "int",      "ftype": "continuous",      "name": "GFS.dist",      "range": [12, 100],      "repeat": 0},{      "annotations": "",      "defaultValue": 1,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.ws",      range": [1, 2],      "repeat": 0      },{      "annotations": "",      "defaultValue": 50,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.wd",      "range": [40, 300],      "repeat": 0      },{      "annotations": "",      "defaultValue": 1,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.rho",      "range": [1, 2],      "repeat": 0},{      "annotations": "",      "defaultValue": 850,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.pres",      "range": [840, 900],      "repeat": 0},{      "annotations": "",      "defaultValue": 19,      "dtype": "float",      "ftype": "continuous",      "name": "GFS.tmp",      "range": [18, 20],      "repeat": 0}      ]
      
    • 为 output_format 参数输入以下代码:

      [   {      "annotations": "",      "defaultValue": 0,      "dtype": "float",      "ftype": "continuous",      "name": "power",      "range": [],      "repeat": 0}]
      
    • interface:选择 REST

    • error_on_exit:选择 false


通过以下步骤添加 Mlflow Model Version Register 算子:

  1. 从算子列表中拖拽一个 Mlflow Model Version Register 算子到 Loop For Model Training 子画布中。

  2. 连接 Create a model 算子和 Model Training 算子的输出锚点与该 Mlflow Model Version Register 算子的输入锚点。

  3. 选择此 Mlflow Model Version Register 算子,配置以下信息:

    • 名称:输入 Create a Model Revision

    • 为 input data 参数输入以下代码:

      {      "data": {      "names": [      "i.set",      "X_basic.forecast_time",      "X_basic.horizon",      "X-basic.time",      "X-basic.hour",      "EC.nwp_time",      "EC.dist",      "EC.ws",      "EC.wd",      "EC.rho",      "EC.pres",      "EC.tmp",      "GFS.nwp_time",      "GFS.dist",      "GFS.ws",      "GFS.wd",      "GFS.rho",      "GFS.pres",      "GFS.tmp"],      "ndarray": [      [      300,      300,      300,      8,      10,      10,      1.5,      1.5,      250,      1,      850,      20,      1,      20,      1,      50,      1,      850,      19]      ]}      }
      
    • version_rule: 选择 time

    • architecture: 选择 X86

    • coprocessor: 选择 None

    • framework:选择 sklearn

    • language: 选择 python3

    • model_reference:引用 Create a Model.model_name_output

    • publisher:选择 system

    • minio_paths:引用 Generate Target List.mlflow_model_file_paths

    • enforce_register: 选择 true,确保在未能通过模型测试时也能正常注册模型版本。

    • serve_as_file:选择 true,算子将保存模型文件,用于模型文件预测任务。


通过以下步骤添加 Model Instance 算子:

  1. 从算子列表中拖拽一个 Model Instance 算子到 Loop For Model Training 子画布中。
  2. 连接 Create a model 算子的输出锚点与该 Model Instance 算子的输入锚点。
  3. 选择此 Model Instance 算子,配置以下信息:
    • 名称:输入 Create a Model Instance
    • name:引用 modelinstancename
    • resource_pool:引用 resourcepool
    • model_name:引用 Create a Model.model_name_output
    • deploy_mode:选择 ONLINE


通过以下步骤添加 Model Test 算子:

  1. 从算子列表中拖拽一个 Model Test 算子到 Loop For Model Training 子画布中。

  2. 连接 Create a Model Revision 算子的输出锚点与该 Model Test 算子的输入锚点。

  3. 选择此 Model Test 算子,配置以下信息:

    • 名称:输入 Create a Model Test

    • 为 input_dat 参数输入以下代码:

      {      "data": {      "names": [      "i.set",      "X_basic.forecast_time",      "X_basic.horizon",      "X-basic.time",      "X-basic.hour",      "EC.nwp_time",      "EC.dist",      "EC.ws",      "EC.wd",      "EC.rho",      "EC.pres",      "EC.tmp",      "GFS.nwp_time",      "GFS.dist",      "GFS.ws",      "GFS.wd",      "GFS.rho",      "GFS.pres",      "GFS.tmp"],      "ndarray": [      [      300,      300,      300,      8,      10,      10,      1.5,      1.5,      250,      1,      850,      20,      1,      20,      1,      50,      1,      850,      19      ]      ]}      }
      
    • model_builder: 引用 Create a Model Revision.model_builder_name

    • test_timeout: 输入 300


通过以下步骤添加 Single Model Deployment 算子:

  1. 从算子列表中拖拽一个 Single Model Deployment 算子到 Loop For Model Training 子画布中。
  2. 连接 Create a Model Instance 算子、Create a Model Revision 算子和 Create a Model Test 算子的输出锚点与该 Single Model Deployment 算子的输入锚点。
  3. 选择此 Single Model Deployment 算子,配置以下信息:
    • 名称:输入 Deploy the Model Revision
    • model_revision:引用 Create a Model Revision.model_revision_name
    • instance_name:引用 Create a Model Instance.instance_name_output
    • request_cpu:输入 0.5
    • request_memory:输入 0.5
    • limit_cpu:输入 0.8
    • limit_memory:输入 0.8
  4. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。

配置完成的 Loop For Model Training 子画布如下所示:

../_images/loop_for_model_training_canvas.png

配置预测任务


在主画布中,双击 Condition for Prediction 算子打开子画布。你需要在 Condition for Prediction 子画布中添加 2 个算子。

  • PythonEx 算子:生成用于模型预测任务的循环列表。
  • ParallelFor 算子:执行模型预测任务。

配置生成循环列表的 PythonEx 算子


通过以下步骤添加 PythonEx 算子:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Prediction 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Generate Target List for Prediction
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 generate_model_list.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 lenoflist | number | Reference | lenoflist
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 result_list | file
  3. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。

配置并发执行模型预测任务的 ParallelFor 算子


通过以下步骤添加 ParallelFor 算子:

  1. 从算子列表中拖拽一个 ParallelFor 算子到 Condition for Prediction 子画布中。
  2. 连接 Generate Target List for Prediction 算子的输出锚点与该 ParallelFor 算子的输入锚点。
  3. 选择此 ParallelFor 算子,配置以下信息:
    • 名称:输入 Loop For prediction
    • 输入参数:配置为 引用 | Generate Target List.result_list | item
  4. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。


本教程包含以下两种模型预测任务类型:基于 Mlflow 模型文件的预测,以及基于已发布模型服务的预测。你需要在 Loop For prediction 子画布中添加 2 个 Condition 算子以执行不同类型的预测任务:

  • Condition 算子 1:当全局参数 predictiontype 的参数值为 service 时,基于已发布模型服务执行模型预测任务。
  • Condition 算子 2:当全局参数 predictiontype 的参数值为 file 时,基于 Mlflow 模型文件执行模型预测任务。


配置执行模型服务预测的 Condition 算子


通过以下步骤添加 Condition 算子:

  1. 从算子列表中拖拽一个 Condition 算子到 Loop For prediction 子画布中。
  2. 选择此算子,配置以下信息:
    • 名称: 输入 Condition for Service Prediction Type
    • 表达式:配置为 引用 | predictiontype | == | 声明| service


Condition for Service Prediction Type 算子执行模型服务预测,并导出预测结果文件。若当前 OU 下有可用 EnOS Hive 数据源,可将预测结果文件上传至 Hive 表格。你需要在 Condition for Service Prediction Type 子画布中添加 5 个算子:

  • PythonEx 算子 1:准备预测数据。
  • Service Prediction 算子:基于模型服务执行预测任务,并输出预测结果。
  • PythonEx 算子 2:将预测结果与真实数据写入文件。
  • PythonCode 算子:检查当前 OU 下是否有可用于存储预测结果文件的 EnOS Hive。
  • Condition 算子:在 EnOS Hive 可用时将结果文件存储至 EnOS Hive。


通过以下步骤添加 PythonEx 算子 1:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Service Prediction Type 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Prepare Prediction Data
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 prepare_predict_data.py
    • requirements: 选择 requirements1.txt
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 datasetname | string | Reference | dataset_name
    • 选择 输入参数 > 添加参数 为算子添加另一输入参数,并配置为 ratio | number | Reference | sample_split_ratio
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 features | file
    • 选择 输出参数 > 添加参数 为算子添加另一输出参数,并配置为 raw_data | file


通过以下步骤添加 Service Prediction 算子:

  1. 从算子列表中拖拽一个 Service Prediction 算子到 Condition for Service Prediction Type 子画布中。
  2. 连接 Prepare Prediction Data 算子的输出锚点与该 Service Prediction 算子的输入锚点。
  3. 选择此 Service Prediction 算子,配置以下信息:
    • 名称:输入 Predict from Service
    • model:引用 item1
    • instance:引用 modelinstancename
    • namespace:引用 resourcepool
    • datatype:选择 csv
    • data:引用 Prepare Prediction Data.features


通过以下步骤添加 PythonEx 算子 2:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Service Prediction Type 子画布中。
  2. 连接 Prepare Prediction Data 算子和 Prediction from Service 算子的输出锚点与该 PythonEx 算子的输入锚点。
  3. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Write results
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 write_results.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 predicted_data | file | Reference | Predict from Service.predictions
    • 选择 输入参数 > 添加参数 为算子添加另一输入参数,并配置为 actual_data | file | Reference | Prepare Prediction Data.raw_data
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 outputfile | file


通过以下步骤添加 PythonCode 算子:

  1. 从算子列表中拖拽一个 PythonCode 算子到 Condition for Service Prediction Type 子画布中。
  2. 选择此 PythonCode 算子,配置以下信息:
    • 名称:输入 Check Hive Config
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 check_hive_config.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 hive_source | hive_source | Reference | hive_source
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 export_to_hive | boolean


通过以下步骤添加 Condition 算子:

  1. 从算子列表中拖拽一个 Condition 算子到 Condition for Service Prediction Type 子画布中。
  2. 连接 Write results 算子和 Check Hive Config 算子的输出锚点与该 Condition 算子的输入锚点。
  3. 选择此 Condition 算子,配置以下信息:
    • 名称:输入 Export to Hive?
    • 表达式: 配置为 引用 | Check Hive Config.export_to_hive | == | 声明| ture


Export to Hive? 算子将上传模型服务预测结果上传至 EnOS Hive。 你需要在 Export to Hive? 子画布中添加 3 个算子:

  • PythonEx 算子:生成 HDFS 目录路径以及用于将文件上传至 EnOS Hive 的 SQL 语句。
  • HDFS Uploader 算子:将预测结果文件上传至 HDFS 目录。
  • Hive 算子:将预测结果储存至 EnOS Hive。


通过以下步骤添加 PythonEx 算子:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Export to Hive? 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Generate variables
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 generate_variables.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 ouid | string | Reference | ouid
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 sql_statements | list
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 hdfs_dest | string


通过以下步骤添加 HDFS Uploader 算子:

  1. 从算子列表中拖拽一个 HDFS Uploader 算子到 Export to Hive? 子画布中。
  2. 连接 Generate variables 算子的输出锚点与该 HDFS Uploader 算子的输入锚点。
  3. 选择此 HDFS Uploader 算子,配置以下信息:
    • 名称:输入 HDFS Uploader
    • data_source: 引用 hdfs_source
    • file:引用 Write results.outputfile
    • filename:引用 item1
    • dest:引用 Generate variables.hdfs_dest
    • overwrite:选择 ture


通过以下步骤添加 Hive 算子:

  1. 从算子列表中拖拽一个 Hive 算子到 Export to Hive? 子画布中。
  2. 连接 Generate variables 算子和 HDFS Uploader 算子的输出锚点与该 Hive 算子的输入锚点。
  3. 选择此 Hive 算子,配置以下信息:
    • 名称:输入 Hive
    • data_source: 引用 hive_source
    • sqls:引用 Generate variables.sql_statements
  4. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。


配置完成的 Condition for Service Prediction 子画布如下所示:

../_images/condition_for_service_prediction_type.png


配置执行模型文件预测的 Condition 算子


通过以下步骤添加 Condition 算子:

  1. 从算子列表中拖拽一个 Condition 算子到 Loop For prediction 子画布中。
  2. 选择此算子,配置以下信息:
    • 名称: 输入 Condition for Model File Prediction
    • 表达式:配置为 引用 | predictiontype | == | 声明| file


Condition for Model File Prediction 算子执行 Mlflow 模型文件预测,并导出预测结果文件。若当前 OU 下有可用 EnOS Hive,可将预测结果文件上传至 Hive 表格。你需要在 Condition for Model File Prediction 子画布中添加 6 个算子:

  • PythonEx 算子 1:获取最新模型版本。
  • PythonEx 算子 2:准备预测数据。
  • Service Prediction 算子:基于模型服务执行预测任务,并输出预测结果。
  • PythonEx 算子 3:将预测结果与真实数据写入文件。
  • PythonEx 算子 4:检查当前OU下是否有可用于存储预测结果文件的 EnOS Hive。
  • Condition 算子:在 EnOS Hive 可用时将结果文件存储至 EnOS Hive。


通过以下步骤添加 PythonEx 算子 1:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Model File Prediction 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Get Latest Model Version
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 get_latest_model_version.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 model_name | model_name | Reference | item1
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 latest_version | model_version


通过以下步骤添加 PythonEx 算子 2:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Model File Prediction 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Prepare Prediction Data 2
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 prepare_predict_data.py
    • requirements: 选择 requirements1.txt
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 datasetname | string | Reference | prediction_dataset_name
    • 选择 输入参数 > 添加参数 为算子添加另一输入参数,并配置为 ratio | number | Reference | sample_split_ratio
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 features | file
    • 选择 输出参数 > 添加参数 为算子添加另一输出参数,并配置为 raw_data | file


通过以下步骤添加 Mlflow Model Version Prediction 算子:

  1. 从算子列表中拖拽一个 Mlflow Model Version Prediction 算子到 Condition for Model File Prediction 子画布中。
  2. 连接 Prepare Prediction Data 2 算子和 Get Latest Model Version 算子的输出锚点与该 Mlflow Model Version Prediction 算子的输入锚点。
  3. 选择此 Mlflow Model Version Prediction 算子,配置以下信息:
    • 名称:输入 Predict from Mlflow Model File
    • model_name:引用 item1
    • model_version:引用 Get Latest Model Version.latest_version
    • data:引用 Prepare Prediction Data 2.features
    • data_type:选择 csv


通过以下步骤添加 PythonEx 算子 3:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Condition for Model File Prediction 子画布中。
  2. 连接 Prepare Prediction Data 2 算子和 Predict from Mlflow Model File 算子的输出锚点与该 PythonEx 算子的输入锚点。
  3. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Write results 2
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 write_results.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 predicted_data | file | Reference | Predict from Mlflow Model File.predictions
    • 选择 输入参数 > 添加参数 为算子添加另一输入参数,并配置为 actual_data | file | Reference | Prepare Prediction Data 2.raw_data
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 outputfile | file


通过以下步骤添加 PythonCode 算子:

  1. 从算子列表中拖拽一个 PythonCode 算子到 Condition for Model File Prediction 子画布中。
  2. 选择此 PythonCode 算子,配置以下信息:
    • 名称:输入 Check Hive Config 2
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 check_hive_config.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 hive_source | hive_source | Reference | hive_source
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 export_to_hive | boolean


通过以下步骤添加 Condition 算子:

  1. 从算子列表中拖拽一个 Condition 算子到 Condition for Model File Prediction 子画布中。
  2. 连接 Write results 2 算子和 Check Hive Config 2 算子的输出锚点与该 Condition 算子的输入锚点。
  3. 选择此 Condition 算子,配置以下信息:
    • 名称:输入 Export to Hive 2?
    • 表达式: 配置为 引用 | Check Hive Config 2.export_to_hive | == | 声明| ture


Export to Hive 2? 算子将上传 Mlflow 模型文件预测结果上传至 EnOS Hive。 你需要在 Export to Hive 2? 子画布中添加 3 个算子:

  • PythonEx 算子:生成 HDFS 目录路径以及用于将文件上传至 EnOS Hive 的 SQL 语句。
  • HDFS Uploader 算子:将预测结果文件上传至 HDFS 目录。
  • Hive 算子:将预测结果储存至 EnOS Hive。


通过以下步骤添加 PythonEx 算子:

  1. 从算子列表中拖拽一个 PythonEx 算子到 Export to Hive 2? 子画布中。
  2. 选择此 PythonEx 算子,配置以下信息:
    • 名称:输入 Generate variables 2
    • workspace:选择 wind_power_forecasting
    • entrypoint:选择 generate_variables.py
    • 选择 输入参数 > 添加参数 为算子添加一个输入参数,并配置为 ouid | string | Reference | ouid
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 sql_statements | list
    • 选择 输出参数 > 添加参数 为算子添加一个输出参数,并配置为 hdfs_dest | string


通过以下步骤添加 Hive 算子:

  1. 从算子列表中拖拽一个 Hive 算子到 Export to Hive? 子画布中。
  2. 连接 Generate variables 算子的输出锚点与该 Hive 算子的输入锚点。
  3. 选择此 Hive 算子,配置以下信息:
    • 名称:输入 Hive 2
    • data_source: 引用 hive_source
    • sqls:引用 Generate variables 2.sql_statements


通过以下步骤添加 HDFS Uploader 算子:

  1. 从算子列表中拖拽一个 HDFS Uploader 算子到 Export to Hive 2? 子画布中。
  2. 连接 Generate variables 2 算子和 Hive2 算子的输出锚点与该 HDFS Uploader 算子的输入锚点。
  3. 选择此 HDFS Uploader 算子,配置以下信息:
    • 名称:输入 HDFS Uploader 2
    • data_source: 引用 hdfs_source
    • file:引用 Write results 2.outputfile
    • filename:引用 item1
    • dest:引用 Generate variables 2.hdfs_dest
    • overwrite:选择 ture
  4. 选择顶部工具栏的保存图标 save_icon 以保存配置信息。


配置完成的 Condition for Model File Prediction 子画布如下所示:

../_images/condition_for_model_file_prediction.png