EDH Kafka Consumer


A data source stage, which consumes data from specified Kafka Topic. It supports both standalone and cluster pipelines. When configuring the EDH Kafka Consumer, the configuration is different for general pipelines and advanced pipelines.


When configuring the EDH Kafka Consumer for general pipelines, the consumed Topic is specified by default and cannot be changed. That is, MEASURE_POINT_INTERNAL_{OUID} Topic for real-time channel, and MEASURE_POINT_INTERNAL_OFFLINE_{OUID} Topic for offline channel. In which, {OUID} is the organization ID.


When configuring the EDH Kafka Consumer for advanced pipelines, the consumed Topic can be specified based on business needs. By default, the system will create the following 6 Topics for use as needed:

  • MEASURE_POINT_INTERNAL_{OUID}
  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
  • MEASURE_POINT_CAL_{OUID}
  • MEASURE_POINT_CAL_OFFLINE_{OUID}
  • MEASURE_POINT_ORIGIN_{OUD}
  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

In which, {OUID} is the organization ID. You can also request for the Stream Processing - Message Queue resource on the Resource Management page, which can be selected as the consumed Topic.

Configuration

The configuration tabs for this stage are General and Kafka.

General

Name Required? Description
Name Yes The name of the stage.
Description No The description of the stage.

Kafka (General)

Name Required? Description
Kafka Configuration No Configure advanced Kafka parameters. The parameter auto.offset.reset will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_consumer_config_11.png

Kafka (Advanced)

Name Required? Description
Topic Yes Consumed Kafka Topic.
Kafka Configuration No Configure advanced Kafka parameters. The parameters auto.offset.reset and default.api.timeout.ms will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_consumer_config_21.png

Data Format

The topics created by the system have strict data formats. Sample for each topic is as follows:

MEASURE_POINT_ORIGIN_{OUID} Data Format

{
  "orgId":"1b47ed98d1800000",
  "modelId":"inverter",
  "modelIdPath":"/rootModel/inverter",
  "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

Or:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} Data Format

//Without quality
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

Or:

//With quality
{
        "orgId": "1b47ed98d1800000",
        "modelId": "inverter",
        "modelIdPath": "/rootModel/inverter",
        "payload": {
                "measurepoints": {
                        "tempWithQuality": {
                                "value": 23.4,
                                "quality": 0
                        }
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        },
        "dq": {
                "measurepoints": {
                        "tempWithQuality": 1
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        }
}

Parameters

Field Corresponding Device Model Field Description
orgId TSLModel.ou Organization ID, to be used by downstream subscription and alert services.
modelId TSLModel.tslModelId Model ID (identifier of user defined model), to be used by downstream subscription and alert services.
modelIdPath TSLModel.tslModelIdPath Complete path of user defined model.
assetId TSLInstance.tslInstanceId Asset ID, to be used for complying with existing data formats (Real-time stream data format).
time n/a Timestamp.
measurepoints TSLIdentifier.identifier

Measurement point and value.

  • For measurement points without data quality: "temp":0.7121109803730992
  • For measurement points with data quality: "tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} Data Format

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

Parameters

Field Corresponding CommDataObject Field Description
orgId orgId Organization ID, to be used by downstream subscription and alert services.
modelId modelId Model ID (identifier of user defined model), to be used by downstream subscription and alert services.
modelIdPath modelIdPath Complete path of user defined model.
assetId assetId Asset ID, to be used for complying with existing data formats (Real-time stream data format).
pointId measurepoints.key Measurement point ID.
time time Timestamp.
value measurepoints.pointId or measurepoints.pointId.value

Measurement point value:

  • For measurement points without data quality: the value.
  • For measurement points with data quality: need to parse the value field in the map.
quality null or measurepoints.pointId.quality

Measurement point data quality:

  • For measurement points without data quality: 0.
  • For measurement points with data quality: the quality field in the map.
dq dq.measurepoints.pointId Not required.
attr n/a Extra information that is needed in cal (currently not exposed to upstream or downstream services).