EDH Kafka Producer


目标算子,向定义的 Kafka Topic 中发送数据,支持单机模式和集群模式。配置 EDH Kafka Producer 算子时,需要区分 “常规流” 和 “高级流” 任务中的配置。


配置常规流数据处理任务时,发送数据的 Topic 为默认值且不可选择,即实时通道 Topic 为 MEASURE_POINT_INTERNAL_OUID;离线通道 Topic 为 MEASURE_POINT_INTERNAL_OFFLINE_OUID,其中 OUID 为组织 ID。


配置高级流数据处理任务时,发送数据的 Topic 可以自由选择。默认情况下系统会为每个组织创建以下 6 个 Topic 供选择:

  • MEASURE_POINT_INTERNAL_{OUID}

  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}

  • MEASURE_POINT_CAL_{OUID}

  • MEASURE_POINT_CAL_OFFLINE_{OUID}

  • MEASURE_POINT_ORIGIN_{OUD}

  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

其中 {OUID} 为组织 ID。另外,用户也可以通过 资源管理 页面申请 流数据处理-消息队列 资源,以便在这里选择对应的 Topic。

配置详情

该算子的配置包括 GeneralKafka 的详细信息,各字段的配置如下:

General

名称

是否必须

描述

Name

Yes

算子名称

Description

No

算子描述

Kafka(常规流)

名称

是否必须

描述

Kafka Configuration

No

Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_11.png

Kafka(高级流)

名称

是否必须

描述

Topic

Yes

发送数据的 Kafka Topic。

Kafka Configuration

No

Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.msdelivery.timeout.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_21.png

数据格式说明

系统默认创建的 Topic 有严格的数据格式,分别为:

MEASURE_POINT_ORIGIN_{OUID} 数据格式

{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

或:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} 数据格式

//无quality的数据格式
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

或:

//有quality的数据格式
{
        "orgId": "1b47ed98d1800000",
        "modelId": "inverter",
        "modelIdPath": "/rootModel/inverter",
        "payload": {
                "measurepoints": {
                        "tempWithQuality": {
                                "value": 23.4,
                                "quality": 0
                        }
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        },
        "dq": {
                "measurepoints": {
                        "tempWithQuality": 1
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        }
}

参数说明

字段

对应设备模型字段

说明

orgId

TSLModel.ou

组织 ID,主要被下游订阅和告警使用。

modelId

TSLModel.tslModelId

模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。

modelIdPath

TSLModel.tslModelIdPath

用户自定义模型的完整路径。

assetId

TSLInstance.tslInstanceId

资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。

time

n/a

时间戳

measurepoints

TSLIdentifier.identifier

测点和值

  • 不带质量位的点:"temp":0.7121109803730992

  • 带质量位的点:"tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} 数据格式

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

参数说明

字段

CommDataObject 对应字段

说明

orgId

orgId

组织 ID,主要被下游订阅和告警使用。

modelId

modelId

模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。

modelIdPath

modelIdPath

用户自定义模型的完整路径。

assetId

assetId

资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。

pointId

measurepoints.key

测点唯一标识符。

time

time

时间戳

value

measurepoints.pointId 或 measurepoints.pointId.value

测点值

  • 不带质量位的点:即 value 值

  • 带质量位的点:需解 map 中的 value 字段

quality

null 或 measurepoints.pointId.quality

测点质量位

  • 不带质量位的点:0

  • 带质量位的点:map 中的 quality 字段

dq

dq.measurepoints.pointId

非必填字段

attr

n/a

cal 中需要的额外信息,暂时不对上下游暴露。