EDH Kafka Producer


目标算子,向定义的 Kafka Topic 中发送数据,支持单机模式和集群模式。配置 EDH Kafka Producer 算子时,需要区分 “常规流” 和 “高级流” 任务中的配置。


配置常规流数据处理任务时,发送数据的 Topic 为默认值且不可选择,即实时通道 Topic 为 MEASURE_POINT_INTERNAL_${OUID};离线通道 Topic 为 MEASURE_POINT_INTERNAL_OFFLINE_${OUID},其中 {OUID} 为组织 ID。


配置高级流数据处理任务时,发送数据的 Topic 可以自由选择。默认情况下系统会为每个组织创建以下6个 Topic 供选择:

  • MEASURE_POINT_INTERNAL_{OUID}
  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
  • MEASURE_POINT_CAL_{OUID}
  • MEASURE_POINT_CAL_OFFLINE_{OUID}
  • MEASURE_POINT_ORIGIN_{OUD}
  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

其中 {OUID} 为组织 ID。另外,用户也可以通过 资源管理 页面申请 流数据处理-消息队列 资源,以便在这里选择对应的 Topic。

配置详情

该算子的配置包括 GeneralKafka 的详细信息,各字段的配置如下:

General

名称 是否必须 描述
Name Yes 算子名称
Description No 算子描述

Kafka(常规流)

名称 是否必须 描述
Kafka Configuration No Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_1.png

Kafka(高级流)

名称 是否必须 描述
Topic Yes 发送数据的 Kafka Topic。
Kafka Configuration No Kafka 高级参数设置,默认会填充 retriesmax.in.flight.requests.per.connectionretry.backoff.msdelivery.timeout.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_producer_config_2.png

数据格式说明

系统默认创建的 Topic 有严格的数据格式,分别为:

MEASURE_POINT_ORIGIN_{OUID} 数据格式

{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

或:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} 数据格式

//无quality的数据格式
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

或:

//有quality的数据格式
{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload":
     {
            "measurepoints": {
                "tempWithQuality": {
                    "value":23.4,
                    "quality”:0
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        },
   "dq": {
            "measurepoints": {
                "tempWithQuality": 1
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        }
}

参数说明

字段 对应设备模型字段 说明
orgId TSLModel.ou 组织 ID,主要被下游订阅和告警使用。
modelId TSLModel.tslModelId 模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。
modelIdPath n/a 用户自定义模型的完整路径。
assetId TSLInstance.tslInstanceId 资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。
time n/a 时间戳
measurepoints TSLIdentifier.identifier

测点和值

  • 不带质量位的点:"temp":0.7121109803730992
  • 带质量位的点:"tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} 数据格式

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

参数说明

字段 CommDataObject 对应字段 说明
orgId orgId 组织 ID,主要被下游订阅和告警使用。
modelId modelId 模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。
modelIdPath modelIdPath 用户自定义模型的完整路径。
assetId assetId 资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。
pointId measurepoints.${key} 测点唯一标识符。
time time 时间戳
value measurepoints.${pointId} 或 measurepoints.${pointId}.value

测点值

  • 不带质量位的点:即 value 值
  • 带质量位的点:需解 map 中的 value 字段
quality null 或 measurepoints.${pointId}.quality

测点质量位

  • 不带质量位的点:0
  • 带质量位的点:map 中的 quality 字段
dq dq.measurepoints.${pointId} 非必填字段
attr n/a cal 中需要的额外信息,暂时不对上下游暴露。