EDH Kafka Producer


目标算子,向定义的 Kafka Topic 中发送数据,支持单机模式和集群模式。配置 EDH Kafka Producer 算子时,需要区分 “常规流” 和 “高级流” 任务中的配置。


配置常规流数据处理任务时,发送数据的 Topic 为默认值且不可选择,即实时通道 Topic 为 MEASURE_POINT_INTERNAL_{OUID};离线通道 Topic 为 MEASURE_POINT_INTERNAL_OFFLINE_{OUID},其中 {OUID} 为组织 ID。


配置高级流数据处理任务时,发送数据的 Topic 可以自由选择。默认情况下系统会为每个组织创建以下 6 个 Topic 供选择:

  • MEASURE_POINT_INTERNAL_{OUID}
  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
  • MEASURE_POINT_CAL_{OUID}
  • MEASURE_POINT_CAL_OFFLINE_{OUID}
  • MEASURE_POINT_ORIGIN_{OUD}
  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

其中 {OUID} 为组织 ID。另外,用户也可以通过 资源管理 页面申请 流数据处理-消息队列 资源,以便在这里选择对应的 Topic。

配置详情

该算子的配置包括 GeneralKafka 的详细信息,各字段的配置如下:

General

名称 是否必须 描述
Name Yes 算子名称
Description No 算子描述

Kafka(常规流)

名称 是否必须 描述
Kafka Configuration No Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_11.png

Kafka(高级流)

名称 是否必须 描述
Topic Yes 发送数据的 Kafka Topic。
Kafka Configuration No Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.msdelivery.timeout.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_21.png

数据格式说明

系统默认创建的 Topic 有严格的数据格式,分别为:

MEASURE_POINT_ORIGIN_{OUID} 数据格式

{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

或:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} 数据格式

//无quality的数据格式
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

或:

//有quality的数据格式
{
        "orgId": "1b47ed98d1800000",
        "modelId": "inverter",
        "modelIdPath": "/rootModel/inverter",
        "payload": {
                "measurepoints": {
                        "tempWithQuality": {
                                "value": 23.4,
                                "quality": 0
                        }
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        },
        "dq": {
                "measurepoints": {
                        "tempWithQuality": 1
                },
                "time": 1542609276270,
                "assetId": "zabPDuHq"
        }
}

参数说明

字段 对应设备模型字段 说明
orgId TSLModel.ou 组织 ID,主要被下游订阅和告警使用。
modelId TSLModel.tslModelId 模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。
modelIdPath TSLModel.tslModelIdPath 用户自定义模型的完整路径。
assetId TSLInstance.tslInstanceId 资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。
time n/a 时间戳
measurepoints TSLIdentifier.identifier

测点和值

  • 不带质量位的点:"temp":0.7121109803730992
  • 带质量位的点:"tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} 数据格式

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

参数说明

字段 CommDataObject 对应字段 说明
orgId orgId 组织 ID,主要被下游订阅和告警使用。
modelId modelId 模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。
modelIdPath modelIdPath 用户自定义模型的完整路径。
assetId assetId 资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。
pointId measurepoints.key 测点唯一标识符。
time time 时间戳
value measurepoints.pointId 或 measurepoints.pointId.value

测点值

  • 不带质量位的点:即 value 值
  • 带质量位的点:需解 map 中的 value 字段
quality null 或 measurepoints.pointId.quality

测点质量位

  • 不带质量位的点:0
  • 带质量位的点:map 中的 quality 字段
dq dq.measurepoints.pointId 非必填字段
attr n/a cal 中需要的额外信息,暂时不对上下游暴露。