EDH Kafka Consumer


A data source stage, which consumes data from specified Kafka Topic. It supports both standalone and cluster pipelines. When configuring the EDH Kafka Consumer, the configuration is different for general pipelines and advanced pipelines.


When configuring the EDH Kafka Consumer for general pipelines, the consumed Topic is specified by default and cannot be changed. That is, MEASURE_POINT_INTERNAL_{OUID} Topic for real-time channel, and MEASURE_POINT_INTERNAL_OFFLINE_{OUID} Topic for offline channel. In which, {OUID} is the organization ID.


When configuring the EDH Kafka Consumer for advanced pipelines, the consumed Topic can be specified based on business needs. By default, the system will create the following 6 Topics for use as needed:

  • MEASURE_POINT_INTERNAL_{OUID}

  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}

  • MEASURE_POINT_CAL_{OUID}

  • MEASURE_POINT_CAL_OFFLINE_{OUID}

  • MEASURE_POINT_ORIGIN_{OUD}

  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

In which, {OUID} is the organization ID. You can also request for the Stream Processing - Message Queue resource on the Resource Management page, which can be selected as the consumed Topic.

Configuration

The configuration tabs for this stage are General and Kafka.

General

Name

Required?

Description

Name

Yes

The name of the stage.

Description

No

The description of the stage.

Kafka (General)

Name

Required?

Description

Kafka Configuration

No

Configure advanced Kafka parameters. The parameter auto.offset.reset will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_consumer_config_1.png

Kafka (Advanced)

Name

Required?

Description

Topic

Yes

Consumed Kafka Topic.

Kafka Configuration

No

Configure advanced Kafka parameters. The parameters auto.offset.reset and default.api.timeout.ms will be preseted by default. You can change the configuration as needed. For more information, see Kafka Documentation.


Sample configuration is as follows:

../../../_images/kafka_consumer_config_2.png

Data Format

The topics created by the system have strict data formats. Sample for each topic is as follows:

MEASURE_POINT_ORIGIN_{OUID} Data Format

{
  "orgId":"1b47ed98d1800000",
  "modelId":"inverter",
  "modelIdPath":"/rootModel/inverter",
  "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

Or:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} Data Format

//Without quality
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

Or:

//With quality
{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload":
     {
            "measurepoints": {
                "tempWithQuality": {
                    "value":23.4,
                    "quality”:0
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        },
   "dq": {
            "measurepoints": {
                "tempWithQuality": 1
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        }
}

Parameters

Field

Corresponding Device Model Field

Description

orgId

TSLModel.ou

Organization ID, to be used by downstream subscription and alert services.

modelId

TSLModel.tslModelId

Model ID (identifier of user defined model), to be used by downstream subscription and alert services.

modelIdPath

n/a

Complete path of user defined model.

assetId

TSLInstance.tslInstanceId

Asset ID, to be used for complying with existing data formats (Real-time stream data format).

time

n/a

Timestamp.

measurepoints

TSLIdentifier.identifier

Measurement point and value.

  • For measurement points without data quality: "temp":0.7121109803730992

  • For measurement points with data quality: "tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} Data Format

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

Parameters

Field

Corresponding CommDataObject Field

Description

orgId

orgId

Organization ID, to be used by downstream subscription and alert services.

modelId

modelId

Model ID (identifier of user defined model), to be used by downstream subscription and alert services.

modelIdPath

modelIdPath

Complete path of user defined model.

assetId

assetId

Asset ID, to be used for complying with existing data formats (Real-time stream data format).

pointId

measurepoints.${key}

Measurement point ID.

time

time

Timestamp.

value

measurepoints.${pointId} or measurepoints.${pointId}.value

Measurement point value:

  • For measurement points without data quality: the value.

  • For measurement points with data quality: need to parse the value field in the map.

quality

null or measurepoints.${pointId}.quality

Measurement point data quality:

  • For measurement points without data quality: 0.

  • For measurement points with data quality: the quality field in the map.

dq

dq.measurepoints.${pointId}

Not required.

attr

n/a

Extra information that is needed in cal (currently not exposed to upstream or downstream services).