教程:使用消息集成从虚拟的第三方系统中集成设备历史数据¶
你可以使用消息集成,从第三方系统中导入设备的历史数据和离线缓存数据。
本教程模拟了如下情境:在第三方系统存有历史数据的某设备,现因业务需要,要将该设备接入EnOS。该设备在第三方系统遗留的测点的历史数据也需要接入EnOS,如下图所示:
接下来本文将指导你完成场景中的历史数据迁移任务。
任务描述¶
本教程需要完成的步骤,如下图所示:
下文的操作将按照流程图中的步骤编号进行,实线箭头表示各步骤之间的依赖关系:
- 创建好模型之后才能配置存储策略
- 创建好产品才能创建消息集成通道
你也可以在遵循依赖关系的前提下自行安排执行每项操作的顺序。
虚线箭头表示测点数据的传输方向:由虚拟的第三方系统、经过消息集成通道、最后到达EnOS的时序数据管理TSDB。
步骤¶
步骤1:创建模型、产品,注册设备¶
创建一个虚拟电流表的模型,其具体参数如下,创建模型的具体步骤参见创建模型:
¶ 字段 取值 模型标识符 test.ammeter 模型名称 虚拟电流表 分类 无 模型关系 无 模型模板 无 模型描述 无 为该设备定义如下的要素:
¶ 字段 取值 功能类型 测点 名称 实时电流 标识符 current 测点类型 AI 是否有质量位 无 数据类型 double 单位 电流:毫安 | mA 自定义标签 无 描述 无 基于 虚拟电流表 模型,创建一个产品,具体参数如下,创建产品的具体方法,参见创建产品(设备集合):
¶ 字段 取值 产品名称 虚拟电流表产品 节点类型 设备 模型 虚拟电流表 数据格式 JSON 证书双向认证 禁用 产品描述 无 基于 虚拟电流表产品 ,注册一个虚拟电流表设备,其参数如下,注册设备的具体方法,参见注册设备:
¶ 字段 取值 产品 虚拟电流表设备 Device Key simulatedAmmeter 设备名称 虚拟电流表设备 时区/城市 UTC+08:00
步骤2:配置时序数据存储策略¶
在 时序数据管理 > 存储策略 中,为 虚拟电流表产品 配置TSDB存储策略。
选择一个已有的策略分组,或者新建一个分组,点击 修改分组 ,勾选 test.ammeter ,点击 确认 ,将该模型纳入到该策略分组中。
点击 AI原始数据 右边的 ,勾选 test.ammeter ,将该模型下所有测点加入当前存储策略。
由于之前配置的测点 实时电流 为AI类型数据,本教程也无需对其进行归一化处理,所以将测点加入 AI原始数据 存储策略中。
步骤3:创建消息集成通道¶
步骤4:使用MQTT Client发送模拟数据¶
- 将如下Java示例代码复制到你使用的代码编辑器,根据注释及下文说明替换其中参数,然后运行:
import org.eclipse.paho.client.mqttv3.*; //使用Eclipse Paho MQTT客户端模拟第三方系统
public class IntegrationSample {
public static String brokerURL = "tcp://tcp-address:port"; //点击通道的 详情 按钮,可以查看Broker URL
public static String clientId = "id12345|securemode=4,signmethod=sha1,timestamp=6355824601597|"; //点击通道的 详情 按钮,可以查看ClientID
public static String userName = "%channel%&12345"; //点击通道的 详情 按钮,可以查看Username
public static String password = "7C4095981435F3ABCD4E6CFFF8F7E27F26975505"; //点击通道的 详情 按钮,可以查看Password
public static MqttClient sampleClient=null;
public static void postPoint(int i) throws MqttException { //以下函数用于构造要发送的Json字符串
String topic = "/sys/O0DUTcD8/integration/measurepoint/post"; //用于测点数据上报请求的MQTT Topic,点击通道的 详情 按钮,可以查看
long currTs=1570024800000L+10*i; //构造一个历史时间的时间戳,并不断累加变量i(整形)以模拟一个时间段。该时间戳必须精确到毫秒,即必须为一个13位的长整型。该时间段须早于当前时间。
double value=28.01+0.1*i; //模拟测点数据
String content="{\n" +
" \"id\":\""+currTs+"\",\n" +
" \"version\":\"1.0\",\n" +
" \"method\":\"integration.measurepoint.post\",\n" +
" \"params\":[\n" +
" {\n" +
" \"deviceKey\":\"simulatedAmmeter\",\n" +
" \"time\":"+currTs+",\n" +
" \"measurepoints\":{\n" +
" \"current\":"+ value +" \n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
//构造一个request的数据,其结构和各参数的含义参见 https://www.envisioniot.com/docs/device-connection/zh_CN/2.3.0/reference/mqtt_offline/report_offline_points.html
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
sampleClient.publish(topic, message); //发布消息
}
public static void main(String[] args) throws MqttException {
sampleClient = new MqttClient(brokerURL, clientId); //创建MQTT客户端
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
//连接
sampleClient.connect(connOpts);
for(int i=0;i<1000;i++ ){ //连续发送1000次测点数据
postPoint(i);
}
sampleClient.disconnect(); //断开连接
sampleClient.close();
}
}
上述代码中的brokerURL
、clientId
、userName
、password
、topic
,可以通过点击 simulated_ammeter_integration 的 详情 按钮以查看:
结果¶
等待一段时间后,选择 时序数据管理 > 时序洞察 。选择 虚拟电流表设备 , 将查询时间段设置为前一步的代码段中currTs
代表的时间段,选择 current 测点。观察到该历史时间段的测点数据已经存储于TSDB当中。