Unit 4. Developing a Stream Data Processing Job¶
EnOS Stream Analytics service provides a user-friendly UI for designing stream data processing jobs (pipelines) with StreamSets operators. Developers can quickly configure a pipeline by adding operators (stages) to the pipeline, thus completing data ingestion, filtering, processing, and storage tasks without programming.
In this unit, we will develop a stream data processing job with StreamSets operators to calculate the daily energy production of wind turbines, daily energy production of wind farms, and carbon reduction data of wind farms. Detailed scenario of this lab is as follows:
Simulate the real-time energy production meter reading data of each wind turbine.
Calculate the daily energy production of each wind turbine using the simulated meter reading data.
Calculate the daily energy production of each wind farm using the daily energy production of all wind turbines.
Calculate the daily carbon reduction of each wind farm using the daily energy production of the wind farm.
To meet the requirement of the above business scenario, we need to use the following StreamSets operators:
Operator |
Description |
---|---|
EDH Kafka Consumer User (Data Source) |
Getting complete data records from Kafka |
Point Selector |
Specifying data records of the eos_turbine::ammeter measuring point as the input data |
Last Record Appender |
Appending the last record of the eos_turbine::ammeter measuring point of a wind turbine to the attr field of the current record. |
Cumulant Decomposer |
Getting the delta value between the current record and the last record in the attr field, for calculating energy production data in the time interval. |
Fixed Time Window Aggregator |
Calculating the daily energy production data of a wind turbine by summing up the energy production data in all time intervals of the day. The output results will be used for further calculation and sent to Kafka as well. |
TSL Parent Asset Lookup |
Querying the parent node (wind farm) of a wind turbine on the asset tree. |
Record Generator |
Generating a new record for triggering the calculation of wind farm energy production by a 1-minute frequency. |
Python Evaluator 1 |
Appending parent node information to the generated triggering point and tagging the triggering point with Python scripts. |
Latest Record Merger |
Merging the triggering point and daily energy production data of a wind turbine by parent node information. |
Python Evaluator 2 |
Calculating the daily energy production data of a wind farm by summing up the daily energy production data of all wind turbines with Python scripts. The output results will be used for further calculation and sent to Kafka as well. |
Internal HTTP Client |
Getting the carbon reduction parameter by calling the EnOS Get Asset API. |
Python Evaluator 3 |
Calculating the daily carbon reduction of a wind farm with Python scripts. The output results will be sent to Kafka. |
EDH Kafka Producer User (Data Destination) |
Sending all the output results to Kafka. |
The business scenario is as depicted in the following figure:
For detailed information about the StreamSets operators, see Calculator Library 0.1.0 Documentation.
Creating a StreamSets pipeline¶
Take the following steps to create a StreamSets pipeline:
Download the StreamSets pipeline configuration template from https://support.envisioniot.com/docs/data-asset/zh_CN/2.1.0/_static/streamsets_pipeline_demo.json (right click the link and save the
streamsets_pipeline_demo.json
file to a local directory).Enter the name and description of the stream processing job.
From the Template drop-down list, select Origin Pipeline.
From the Operator Version drop-down list, select the installed StreamSets calculator library version.
For Message Channel, select the source of data to be processed. For this tutorial, select Real-Time.
Click OK to create the stream processing job with the basic settings above. See the following example:
Adding operators to the pipeline¶
Now we can add the needed operators to the pipeline and connect the operators with arrows to form the pipeline.
Select the arrow between the EDH Kafka Consumer User and EDH Kafka Producer User operators and click the Delete icon to remove the connection.
Click the Stage Library icon in the upper right corner of the page, click the Point Selector operator to add it to the pipeline canvas.
Connect the output point of the EDH Kafka Consumer User operator to the input point of the Point Selector operator.
Repeat steps 2 and 3 to add the remaining operators to the pipeline and connect them by the order shown in the following figure.
Click the Auto Arrange icon to align the display of operators in the pipeline.
Click the Save in the toolbar to save the changes. Every time you update the configuration of the pipeline, remember to save the changes until the pipeline is ready to be published.
Configuring operator parameters¶
After the pipeline is created, we can now configure the parameters for the added operators. Select one of the operators and complete the configuration of each tab. For the General and Basic tabs, use the default configuration. For the other tabs, follow the instructions below for each operator.
Point Selector¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::ammeter |
Getting the ammeter point data from Kafka as the input |
See the following example:
Last Record Appender¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::ammeter |
Receiving the wind turbine energy production data as input |
Conditions |
* |
Always replacing the current record with the last record. |
Output Point |
eos_turbine::ammeter_withLast |
Receiving the output result, which contains the current and last record of wind turbine energy production data. |
See the following example:
Cumulant Decomposer¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::ammeter_withLast |
Receiving the current and last record of wind turbine energy production data as input |
Scale Type |
1 |
Using fixed scale type for the electric energy meter. |
Scale |
100 |
Specifying the scale value of the electric energy meter. Options are: |
Slope Type |
1 |
Using fixed slope threshold. |
Min Slope |
0 |
Specifying the lower limit of the slope threshold. |
Max Slope |
100000 |
Specifying the upper limit of the slope threshold. |
Output Point |
eos_turbine::delta_production |
Receiving the output result, which contains the calculated energy production data of wind turbines in the time interval. |
See the following example:
Fixed Time Window Aggregator¶
Complete the configuration of TriggerConfig with the following settings:
Field |
Value |
Description |
---|---|---|
Latency (Minute) |
0 |
Disabling data latency setting. |
Early Trigger |
Enable |
Enabling early output of intermediate results before the time window is closed. |
Early Trigger Type |
By Fixed Frequency |
Generating early output of intermediate results by fixed frequency. |
Early Trigger Frequency (Minute) |
1 |
Generating early output by a 1-minute frequency |
See the following example:
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::delta_production |
Receiving the calculated energy production data of wind turbines in all time intervals as input |
Fixed Window Size |
1 |
Specifying the duration for the fixed time window. |
Fixed Window Unit |
day |
Selecting the unit for the fixed time window. For this tutorial, we will calculate the daily energy production data of wind turbines. |
Aggregator Policy |
sum |
Summing up the energy production data of wind turbines in all time intervals of 1 day. |
Output Point |
eos_turbine::production_daily |
Receiving the output result, which contains the calculated daily energy production data of wind turbines. The output results will be used for further calculation and sent to Kafka as well. |
See the following example:
TSL Parent Asset Lookup¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::production_daily |
Receiving the calculated daily energy production data of wind turbines as input. |
Output Point |
eos_turbine::production_daily_withparent |
Receiving the output result, which contains the calculated daily energy production data of wind turbines and the wind farm information. |
See the following example:
Complete the configuration of Criteria with the following settings:
Field |
Value |
Description |
---|---|---|
Tree Tag |
eos_tree::true |
Specifying the tag of the eos_tree asset tree that is created in Unit 1 . |
Attribute |
All |
Querying wind farm metadata by all asset attribute keys. |
Tag |
None |
Do not query parent asset metadata by asset tags. |
Extra |
None |
Do not query parent asset metadata by extra information. |
See the following example:
Record Generator¶
Complete the configuration of Basic with the following settings:
Field |
Value |
Description |
---|---|---|
Trigger Type |
By Fixed Frequency |
Generating new records by fixed frequency. |
Query Frequency |
1 Minute |
Generating new records by 1-minute frequency. |
See the following example:
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Generate Type |
ByModelIDs |
Generating new records by model IDs. |
Output Point |
eos_site::production_day_trigger |
Receiving the output result, which contains the calculated daily energy production data of wind turbines and the generated data record for triggering calculation of wind farm energy production data. |
See the following example:
Complete the configuration of Record Generate with the following settings:
Field |
Value |
Description |
---|---|---|
Data Source |
Generate New Point |
Creating a point and generating data for the point by the specified methods. |
Date Format |
SimpleDateFormat |
Using simple date format for start time |
StartTime |
2019-08-26T00:00:00+08:00 |
Specifying the time stamp for the start time of the new point. |
Time Interval |
1 Minute |
Specifying the time interval for generating new point data. |
Value Generator |
Quickly Random Number(Double) |
Generating point data randomly. |
See the following example:
Python Evaluator 1¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_site::production_day_trigger |
Receiving the calculated daily energy production data of wind turbines and the generated data record for triggering calculation of wind farm energy production data as input. |
Output Point |
eos_site::production_day_trigger_withtag |
Receiving the output result, which contains the calculated daily energy production data of wind turbines and the triggering points appended with wind farm information and tags. |
See the following example:
Under the Script tab, enter the following script in the Python Script field for appending wind farm information to the generated triggering point and tagging the triggering point:
import time
for record in records:
try:
parentRecord = {'id':record.value['assetId']}
record.value['attr']=dict()
record.value['attr']['tslParentLookup']=[]
record.value['attr']['tslParentLookup'].append (parentRecord)
record.value['attr']['triggerTag'] = True
record.value['time'] = long(round(time.time() * 1000))
record.value['pointId']='production_day_trigger_withtag'
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
Latest Record Merger¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_turbine::production_daily_withparent |
Receiving the calculated daily energy production data of wind turbines and the wind farm information as input. |
Input Point |
eos_site::production_day_trigger_withtag |
Receiving the calculated daily energy production data of wind turbines and the triggering points appended with wind farm information and tags as input. Specifying this point as the trigger of processing. |
Output Point |
eos_site::turbine_merger |
Receiving the output result, which contains the merged data records of energy production data of wind turbines according to wind farm attribute. |
See the following example:
Complete the configuration of MergeConfig with the following settings:
Field |
Value |
Description |
---|---|---|
Merged By Expression |
${record:value(‘/attr/tslParentLookup[0]/id’)} |
Specifying the expression for generating tags that are used to merge the latest records. |
Cache Expire Time (Minute) |
1440 |
Specifying the cache expiring time after the tags are not updated. |
See the following example:
Python Evaluator 2¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_site::turbine_merger |
Receiving the merged data records of energy production data of wind turbines according to wind farm attribute as input. |
Output Point |
eos_site::production_day |
Receiving the output result, which contains the calculated daily energy production data of wind farms. The output results will be used for further calculation and sent to Kafka as well. |
See the following example:
Under the Script tab, enter the following script in the Python Script field for calculating the daily energy production data of wind farms:
from decimal import Decimal
for record in records:
try:
mergedRecords = record.value['attr']['latestRecordMerger']['mergedRecords']
triggerRecords = []
calculateRecords = []
for mergedRecord in mergedRecords:
isTriggerPoint = mergedRecord['attr'].has_key('triggerTag') and mergedRecord['attr']['triggerTag']
if isTriggerPoint:
triggerRecords.append(mergedRecord)
else:
calculateRecords.append(mergedRecord)
if len(triggerRecords) == 1:
v_sum = '0.0'
for calculateRecord in calculateRecords:
v_sum = str(Decimal(str(calculateRecord['value'])) + Decimal(v_sum))
record.value['value'] = float(v_sum)
record.value['time'] = triggerRecords[0]['time']/60000*60000
record.value['assetId'] = triggerRecords[0]['assetId']
record.value['modelId'] = triggerRecords[0]['modelId']
record.value['modelIdPath'] = triggerRecords[0]['modelIdPath']
record.value['pointId'] = 'production_day'
record.value['attr'] = dict()
output.write(record)
else:
error.write(record, "invalid record")
except Exception as e:
# Send record to error
error.write(record, str(e))
Internal HTTP Client¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_site::production_day |
Receiving the daily energy production data of wind farms as input. |
Output Point |
eos_site::with_carbonReduction |
Receiving the output result, which contains the daily energy production data of wind farms and the queried carbon reduction parameter. |
See the following example:
Complete the configuration of Request with the following settings:
Field |
Value |
Description |
---|---|---|
Request Method |
Get |
Selecting the API request method. For the Get Asset API, select the Get method. |
Request URL |
https://{enos-api-gateway}/asset-service/v2.1/assets?action=get |
Specifying the API request URL. To get the API gateway URL, go to EnOS Management Console > Help > Environment Information > API Gateway. |
Param Key |
assetId |
Specifying the following values for the assetId parameter:
|
Param Key |
orgId |
Specifying the following values for the orgId parameter:
|
Access Key |
access_key |
Specifying the access key of the application which is used for API request authentication. For detailed information, see Registering and Managing Applications . |
Secret Key |
YourSecretKey |
Specifying the secret key of the application. |
See the following example:
Python Evaluator 3¶
Complete the configuration of Input/Output with the following settings:
Field |
Value |
Description |
---|---|---|
Input Point |
eos_site::with_carbonReduction |
Receiving the daily energy production data of wind farms and the queried carbon reduction parameter as input. |
Output Point |
eos_site::carbon.reduction.daily |
Receiving the output result, which contains the calculated daily carbon reduction data of wind farms. The output results will be sent to Kafka. |
See the following example:
Under the Script tab, enter the following script in the Python Script field for calculating the daily carbon reduction data of wind farms:
import json
for record in records:
try:
record.value['pointId'] = 'carbon.reduction.daily'
production = record.value['value']
json_response = json.loads(record.value['attr']['httpClient']['response'])
carbon= json_response['data']['attributes']['carbon.reduction.param']
record.value['value']= production/carbon
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
Validating and running the pipeline¶
When the configuration of the operators is completed, we can now validate the configuration and start running the pipeline.
Save the configuration of the pipeline.
Click the Validate icon in the toolbar to verify the configuration of all the operators.
If the validation fails, update the configuration of the operators accordingly.
If the validation is successful, click the Release icon in the toolbar to publish the pipeline.
Run and monitor the running status and results of the pipeline in the Stream Operation page. For detailed steps, see Maintaining Stream Processing Jobs.