Latest Record Merger


This stage generates group tags based on the specified expression. Multiple input points are grouped by the generated tags and output the points by groups. When the specified point arrives, an output will be triggered. The functions of this stage include:

  • Supporting the input of multiple points of multiple models, and multiple input points as calculation trigger points

  • Supporting the output of multiple points of multiple models. The modelId, modelIdPath, and pointId of the input point will be replaced with those of the output point.

  • This stage cannot guarantee idempotence of the calculation results due to failure retries caused by any reasons, such as cluster node exceptions.

Configuration

The configuration tabs for this stage are General, Basic, Input/Output, MergerConfig, and CacheConfig.

General

Name

Required?

Description

Name

Yes

The name of the stage.

Description

No

The description of the stage.

Stage Library

Yes

The streaming operator library to which the stage belongs.

Required Fields

No

The fields that the data records must contain. If the specified fields are not included, the record will be filtered out.

Preconditions

No

The conditions that must be satisfied by the data records. Records that do not meet the conditions will be filtered out. For example, ${record:value('/value') > 0}. For the syntax of EL expressions, see Expression Language.

On Record Error

Yes

The processing method for error data.

  • Discard: Error data will be discarded and ignored

  • Send to Error: Error messages will be reported

  • Stop Pipeline: The pipeline will be stopped

Basic

Name

Required?

Description

Quality Filter

No

Filter the data according to the data quality. Only records that meet the quality conditions will be processed by this stage.

Input/Output

Name

Required?

Description

Input Point

Yes

Specify the measurement point for record merging using the format {modelId}::{pointId}.

Is Trigger

No

Specify whether to set the point as the trigger for processing.

Output Point

Yes

Specify the point for receiving output results using the format {modelId}::{pointId}.

MergerConfig

Name

Required?

Description

Merged By Expression

Yes

Specify the expression for generating the tags that are used to merge the latest records. If the expression contains other identification information such as timestamp, a large amount of data might be generated. When choosing to use Redis as data storage, it is necessary to evaluate the impact on data storage and prepare corresponding plans.

Cache Expire Time (Minute)

Yes

Specify the cache expiring time after the tags are not updated.

CacheConfig

Name

Required?

Description

Cache Type

Yes

Select the storage type for cache data. Options are Redis and Local storage.

  • Redis: The advantage is that the cached data will not be lost after the stream processing pipeline is paused, restarted, or retried. The disadvantage is that the data processing speed is slow and that it is sensitive to network performance. It is recommended that the network delay should be less than 1ms. Otherwise, the data processing performance will be affected.

  • Local: The advantage is that the data processing speed is fast. The disadvantage is that the cached data will be lost after the stream processing pipeline is paused, restarted, or retried.

Output Results

The output results of this stage are included in the attr struct. The description of the fields are as follows:

Name

Data Type

Description

/attr/latestRecordMerger

Map

The data object after merging.

mergedTag

String

The tags that are generated based on the EL expression.

mergedRecords

List

The list of records after merging.

Output Example

../../../_images/lastest_record_merger1.png