EDH Kafka Producer


A destination stage, which sends data to specified Kafka Topic. It supports both standalone and cluster pipelines. When configuring the EDH Kafka Producer, the configuration is different for general pipelines and advanced pipelines.


When configuring the EDH Kafka Producer for general pipelines, the Topic to receive data is specified by default and cannot be changed. That is, MEASURE_POINT_INTERNAL_{OUID} Topic for real-time channel, and MEASURE_POINT_INTERNAL_OFFLINE_{OUID} Topic for offline channel. In which, {OUID} is the organization ID.


When configuring the EDH Kafka Producer for advanced pipelines, the Topic to receive data can be specified based on business needs. By default, the system will create the following 6 Topics for use as needed:

  • MEASURE_POINT_INTERNAL_{OUID}
  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
  • MEASURE_POINT_CAL_{OUID}
  • MEASURE_POINT_CAL_OFFLINE_{OUID}
  • MEASURE_POINT_ORIGIN_{OUD}
  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

In which, {OUID} is the organization ID. You can also request for the Stream Processing - Message Queue resource on the Resource Management page, which can be selected as the Topic to receive data.

Configuration

The configuration tabs for this stage are General and Kafka.

General

Name Required? Description
Name Yes The name of the stage.
Description No The description of the stage.

Kafka (General)

Name Required? Description
Kafka Configuration No Configure advanced Kafka parameters. The parameters retries, max.in.flight.requests.per.connection, and retry.backoff.ms will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_producer_config_11.png

Kafka (Advanced)

Name Required? Description
Topic Yes Kafka Topic to receive data.
Kafka Configuration No Configure advanced Kafka parameters. The parameters retries, max.in.flight.requests.per.connection, retry.backoff.ms, and delivery.timeout.ms will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_producer_config_21.png

Data Format

The topics created by the system have strict data formats. Sample for each topic is as follows:

MEASURE_POINT_ORIGIN_{OUID} Data Format

{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

Or:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} Data Format

//Without quality
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

Or:

//With quality
{
        "orgId": "1b47ed98d1800000",
        "modelId": "inverter",
        "modelIdPath": "/rootModel/inverter",
        "payload": {
                "measurepoints": {
                        "tempWithQuality": {
                                "value": 23.4,
                                "quality": 0
                        }
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        },
        "dq": {
                "measurepoints": {
                        "tempWithQuality": 1
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        }
}

Parameters

Field Corresponding Device Model Field Description
orgId TSLModel.ou Organization ID, to be used by downstream subscription and alert services.
modelId TSLModel.tslModelId Model ID (identifier of user defined model), to be used by downstream subscription and alert services.
modelIdPath TSLModel.tslModelIdPath Complete path of user defined model.
assetId TSLInstance.tslInstanceId Asset ID, to be used for complying with existing data formats (Real-time stream data format).
time n/a Timestamp.
measurepoints TSLIdentifier.identifier

Measurement point and value.

  • For measurement points without data quality: "temp":0.7121109803730992
  • For measurement points with data quality: "tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} Data Format

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

Parameters

Field Corresponding CommDataObject Field Description
orgId orgId Organization ID, to be used by downstream subscription and alert services.
modelId modelId Model ID (identifier of user defined model), to be used by downstream subscription and alert services.
modelIdPath modelIdPath Complete path of user defined model.
assetId assetId Asset ID, to be used for complying with existing data formats (Real-time stream data format).
pointId measurepoints.key Measurement point ID.
time time Timestamp.
value measurepoints.pointId or measurepoints.pointId.value

Measurement point value:

  • For measurement points without data quality: the value.
  • For measurement points with data quality: need to parse the value field in the map.
quality null or measurepoints.pointId.quality

Measurement point data quality:

  • For measurement points without data quality: 0.
  • For measurement points with data quality: the quality field in the map.
dq dq.measurepoints.pointId Not required.
attr n/a Extra information that is needed in cal (currently not exposed to upstream or downstream services).