Python Evaluator

支持运行原生的 Python 脚本,实现自定义的业务逻辑。

配置详情

该算子的配置包括 GeneralBasicInput/Output,和 Script 的详细信息,各字段的配置如下:

General

名称

是否必须

描述

Name

Yes

算子名称

Description

No

算子描述

Stage Library

Yes

算子所属的库

Required Fields

No

数据必须包含的字段,如果未包含指定字段,则 record 将被过滤掉

Preconditions

No

数据必须满足的前提条件,如果不满足指定条件,则 record 将被过滤掉。例如:${record:value('/value') > 0}。有关 EL 语句的使用方法,参考 Expression Language

On Record Error

Yes

对错误数据的处理方式,可选:

  • Discard:直接丢弃

  • Send to Error:发送至错误中心

  • Stop Pipeline:停止流任务运行

Basic

名称

是否必须

描述

Lineage Mapping

Yes

选择数据输入点和数据输出点的血缘对应关系。可选:

  • 1:1:Input/Output 参数配置中,一个数据输入点对应一个数据输出点

  • M:1:Input/Output 参数配置中,可配置任意数量的数据输入点和数据输出点

Quality Filter

No

根据数据质量过滤处理数据,只有符合质量条件的 record 才会进行此次处理

Input/Output

名称

是否必须

描述

Input Point

Yes

数据输入点,格式为:{模型标识}::{测点标识},输入数据的 modelIdPathpointId 必须匹配输入点,才能够进入后续计算。

Output Point

No

数据输出点,格式为:{模型标识}::{测点标识},经过 Python Script 脚本后的输出数据的 modelIdPathpointId 必须匹配输出点,才能够作为真正的输出 record。

Script

名称

是否必须

描述

Python Script

Yes

编写自定义 Python 脚本。其中 records 代表所有经过选中的点并经过质量控制后,流入的数据。

输出结果

运行自定义 Python 脚本后,该算子的输出结果包含在 attr 结构体中。

输出示例

../../../_images/python_evaluator_result1.png

Python 脚本开发指南

# Available constants:

   They are to assign a type to a field with a value null.

   NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG

   NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL

   NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP

# Available Objects:
records: an array of records to process, depending on Jython processor processing mode it may have 1 record or all the records in the batch.

state: a dict that is preserved between invocations of this script.  Useful for caching bits of data e.g. counters.

log.<loglevel>(msg, obj...):
use instead of print to send log messages to the log4j log instead of stdout.
loglevel is any log4j level: e.g. info, error, warn, trace.

output.write(record): writes a record to processor output

error.write(record, message): sends a record to error

sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above to check if the field is typed field with value null

sdcFunctions.createRecord(String recordId): Creates a new record.

Pass a recordId to uniquely identify the record and include enough information to track down the record source.

sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.
Pass True to this function to create a list map (ordered map)

sdcFunctions.createEvent(String type, int version): Creates a new event.
Create new empty event with standard headers.

sdcFunctions.toEvent(Record): Send event to event stream
Only events created with sdcFunctions.createEvent are supported.

sdcFunctions.isPreview(): Determine if pipeline is in preview mode.

Available Record Header Variables:

record.attributes: a map of record header attributes.

record.<header name>: get the value of 'header name'.

# Add additional module search paths:

import sys
sys.path.append('/some/other/dir/to/search')

for record in records:

  try:

    # write record to processor output
    output.write(record)
  except Exception as e:
    # trace the exception
    import sys
    error.trace(sys.exc_info())
    # Send record to error
    error.write(record, str(e))