教程:使用消息集成从虚拟的第三方系统中集成设备历史数据

你可以使用消息集成,从第三方系统中导入设备的历史数据和离线缓存数据。

本教程模拟了如下情境:在第三方系统存有历史数据的某设备,现因业务需要,要将该设备接入EnOS。该设备在第三方系统遗留的测点的历史数据也需要接入EnOS,如下图所示:

../../../_images/historical_message_integration_overview.png

接下来本文将指导你完成场景中的历史数据迁移任务。

任务描述

本教程需要完成的步骤,如下图所示:

../../../_images/historical_message_integration_flowchart.png

下文的操作将按照流程图中的步骤编号进行,实线箭头表示各步骤之间的依赖关系:

  • 创建好模型之后才能配置存储策略
  • 创建好产品才能创建消息集成通道

你也可以在遵循依赖关系的前提下自行安排执行每项操作的顺序。

虚线箭头表示测点数据的传输方向:由虚拟的第三方系统、经过消息集成通道、最后到达EnOS的时序数据管理TSDB。

前提条件

  • 你需要有以下功能的权限,如果没有,请联系组织管理员添加,参见策略,角色,与权限
    • 模型管理
    • 设备管理
    • 时序数据管理
  • 在自己的计算机上安装了Java开发环境。

步骤

步骤 1:创建模型、产品,注册设备

  1. 创建一个虚拟电流表的模型,其具体参数如下,创建模型的具体步骤参见创建模型

    虚拟电流表模型
    字段 取值
    模型标识符 test.ammeter
    模型名称 虚拟电流表
    分类
    模型关系
    模型模板
    模型描述
  2. 为该设备定义如下的要素:

    虚拟电流表模型要素
    字段 取值
    功能类型 测点
    名称 实时电流
    标识符 current
    测点类型 AI
    是否有质量位
    数据类型 double
    单位 电流:毫安 | mA
    自定义标签
    描述
  3. 基于 虚拟电流表 模型,创建一个产品,具体参数如下,创建产品的具体方法,参见创建产品(设备集合)

    虚拟电流表产品
    字段 取值
    产品名称 虚拟电流表产品
    节点类型 设备
    模型 虚拟电流表
    数据格式 JSON
    证书双向认证 禁用
    产品描述
  4. 基于 虚拟电流表产品,注册一个虚拟电流表设备,其参数如下,注册设备的具体方法,参见注册设备

    虚拟电流表设备
    字段 取值
    产品 虚拟电流表设备
    Device Key simulatedAmmeter
    设备名称 虚拟电流表设备
    时区/城市 UTC+08:00

步骤 2:配置时序数据存储策略

  1. 时序数据管理 > 存储策略 中,为 虚拟电流表产品 配置TSDB存储策略。

    1. 选择一个已有的策略分组,或者新建一个分组,点击 修改分组,勾选 test.ammeter,点击 确认,将该模型纳入到该策略分组中。

    2. 点击 AI原始数据 右边的 edit ,勾选 test.ammeter,将该模型下所有测点加入当前存储策略。

      由于之前配置的测点 实时电流 为AI类型数据,本教程也无需对其进行归一化处理,所以将测点加入 AI原始数据 存储策略中。

步骤 3:创建消息集成通道

  1. 设备连接 > 消息集成 中,为 虚拟电流表产品 创建一条消息集成通道如下。创建通道的操作,参见集成设备的离线消息

    消息集成通道参数
    字段 取值
    Channel Name simulated_ammeter_integration
    Protocol MQTT
    Target Product Simulated Ammeter Product
    Data Format JSON
    Description /

步骤 4:使用MQTT Client发送模拟数据

  1. 将如下Java示例代码复制到你使用的代码编辑器,根据注释及下文说明替换其中参数,然后运行:
import org.eclipse.paho.client.mqttv3.*; //使用Eclipse Paho MQTT客户端模拟第三方系统

public class IntegrationSample {

  public static String brokerURL = "tcp://tcp-address:port"; //点击通道的 详情 按钮,可以查看Broker URL
  public static String clientId = "id12345|securemode=4,signmethod=sha1,timestamp=6355824601597|"; //点击通道的 详情 按钮,可以查看ClientID
  public static String userName = "%channel%&12345"; //点击通道的 详情 按钮,可以查看Username
  public static String password = "7C4095981435F3ABCD4E6CFFF8F7E27F26975505"; //点击通道的 详情 按钮,可以查看Password
  public static MqttClient sampleClient=null;


  public static void postPoint(int i) throws MqttException { //以下函数用于构造要发送的Json字符串
    String topic = "/sys/O0DUTcD8/integration/measurepoint/post"; //用于测点数据上报请求的MQTT Topic,点击通道的 详情 按钮,可以查看
    long currTs=1570024800000L+10*i; //构造一个历史时间的时间戳,并不断累加变量i(整形)以模拟一个时间段。该时间戳必须精确到毫秒,即必须为一个13位的长整型。该时间段须早于当前时间。
    double value=28.01+0.1*i; //模拟测点数据

    String content="{\n" +
            "    \"id\":\""+currTs+"\",\n" +
            "    \"version\":\"1.0\",\n" +
            "    \"method\":\"integration.measurepoint.post\",\n" +
            "    \"params\":[\n" +
            "        {\n" +
            "            \"deviceKey\":\"simulatedAmmeter\",\n" +
            "            \"time\":"+currTs+",\n" +
            "            \"measurepoints\":{\n" +
            "                \"current\":"+ value +" \n" +
            "            }\n" +
            "        }\n" +
            "    ]\n" +
            "}";
    //构造一个request的数据,其结构和各参数的含义参见 https://www.envisioniot.com/docs/device-connection/zh_CN/2.4.0/reference/mqtt_offline/report_offline_points.html

    MqttMessage message = new MqttMessage(content.getBytes());
    message.setQos(1);


    sampleClient.publish(topic, message); //发布消息
    }

  public static void main(String[] args) throws  MqttException {
    sampleClient = new MqttClient(brokerURL, clientId); //创建MQTT客户端
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(false);
    connOpts.setUserName(userName);
    connOpts.setPassword(password.toCharArray());
    //连接
    sampleClient.connect(connOpts);


    for(int i=0;i<1000;i++ ){ //连续发送1000次测点数据
    postPoint(i);
    }


    sampleClient.disconnect(); //断开连接
    sampleClient.close();
  }
}

上述代码中的brokerURLclientIduserNamepasswordtopic,可以通过点击 simulated_ammeter_integration详情 按钮以查看:

../../../_images/simulated_ammeter_channel_params.png

结果

等待一段时间后,选择 时序数据管理 > 时序洞察。选择 虚拟电流表设备, 将查询时间段设置为前一步的代码段中currTs代表的时间段,选择 current 测点。观察到该历史时间段的测点数据已经存储于TSDB当中。

../../../_images/simulated_ammeter_integration_result.png