Python Evaluator


This stage supports the running of native Python scripts to meet the requirements of custom business logic.

Configuration

The configuration tabs for this stage are General, Basic, Input/Output, and Script.

General

Name

Required?

Description

Name

Yes

The name of the stage.

Description

No

The description of the stage.

Stage Library

Yes

The streaming operator library to which the stage belongs.

Required Fields

No

The fields that the data records must contain. If the specified fields are not included, the record will be filtered out.

Preconditions

No

The conditions that must be satisfied by the data records. Records that do not meet the conditions will be filtered out. For example, ${record:value('/value') > 0}. For the syntax of EL expressions, see `Expression Language https://docs.streamsets.com/portal/datacollector/latest/help/datacollector/UserGuide/Expression_Language/ExpressionLanguage_overview.html>`__.

On Record Error

Yes

The processing method for error data.

  • Discard: Error data will be discarded and ignored

  • Send to Error: Error messages will be reported

  • Stop Pipeline: The pipeline will be stopped

Basic

Name

Required?

Description

Lineage Mapping

Yes

Select the mapping between the input and output points.

  • 1:1: In each line of the Input/Output configuration, an input point corresponds to an output point.

  • M:1: In the Input/Output configuration, multiple input points correspond to one output point.

Quality Filter

No

Filter the data according to the data quality. Only records that meet the quality conditions will be processed by this stage.

Input/Output

Name

Required?

Description

Input Point

Yes

Specify the input point of the records, using the format {modelId}::{pointId}. The modelIdPath and pointId of the input data must match with those of the input point.

Output Point

No

Specify the output point of the records, using the format {modelId}::{pointId}. The modelIdPath and pointId of the output data processed by the Python script must match with those of the output point.

Script

Name

Required?

Description

Python Script

Yes

Enter the customized Python script, where records represents the input data that has been filtered by the quality conditions.

Output Results

After running the customized script, the output results of this stage are included in the attr struct.

Output Example

../../../_images/python_evaluator_result2.png

Python Script Development Guide

# Available constants:

   They are to assign a type to a field with a value null.

   NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG

   NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL

   NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP

# Available Objects:
records: an array of records to process, depending on Jython processor processing mode it may have 1 record or all the records in the batch.

state: a dict that is preserved between invocations of this script.  Useful for caching bits of data e.g. counters.

log.<loglevel>(msg, obj...):
use instead of print to send log messages to the log4j log instead of stdout.
loglevel is any log4j level: e.g. info, error, warn, trace.

output.write(record): writes a record to processor output

error.write(record, message): sends a record to error

sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above to check if the field is typed field with value null

sdcFunctions.createRecord(String recordId): Creates a new record.

Pass a recordId to uniquely identify the record and include enough information to track down the record source.

sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.
Pass True to this function to create a list map (ordered map)

sdcFunctions.createEvent(String type, int version): Creates a new event.
Create new empty event with standard headers.

sdcFunctions.toEvent(Record): Send event to event stream
Only events created with sdcFunctions.createEvent are supported.

sdcFunctions.isPreview(): Determine if pipeline is in preview mode.

Available Record Header Variables:

record.attributes: a map of record header attributes.

record.<header name>: get the value of 'header name'.

# Add additional module search paths:

import sys
sys.path.append('/some/other/dir/to/search')

for record in records:

  try:

    # write record to processor output
    output.write(record)
  except Exception as e:
    # trace the exception
    import sys
    error.trace(sys.exc_info())
    # Send record to error
    error.write(record, str(e))