消费订阅数据(EnOS Cloud 用户)¶
数据订阅任务启动运行之后,可以使用数据订阅 SDK 开发应用,消费已订阅的数据。
本文介绍使用数据订阅 SDK 的安装和消费订阅数据的代码示例。
EnOS Cloud 支持以下数据订阅 SDK:
Java SDK
Python SDK
.NET SDK
对 EnOS SDK 的详细介绍、最新版本及下载地址,访问 EnOS SDKs 和工具。
使用 Java SDK¶
数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下:
安装数据订阅 Java SDK¶
访问数据订阅 Java SDK 的 Maven 仓库,获取依赖信息:EnOS Cloud Maven 仓库。
在 Java 开发项目文件中添加如下 Maven 依赖,安装数据订阅 SDK:
<dependency> <groupId>com.envisioniot</groupId> <artifactId>subscription-client</artifactId> <version>5.0.2</version> </dependency> <dependency> <groupId>com.envisioniot</groupId> <artifactId>enos-subscribe-impl</artifactId> <version>5.0.2</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
消费实时数据代码示例¶
以下示例为使用指定的 Consumer Group 消费订阅的资产实时数据。如果订阅的数据量较大,可使用同一 Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的效率。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;
public class DataServiceDemo {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
// 订阅分组,相关概念见订阅SDK参考
String consumerGroup = "consumer_group";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取实时数据service
IDataService dataService = eosClient.getDataService();
// 在启动订阅之前需要创建订阅数据处理函数
IDataHandler dataHandler = new IDataHandler(){
public void dataRead(StreamMessage message) {
System.out.println(message);
}
};
// 调用subscribe函数创建订阅连接,调用后订阅连接被创建
// 同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明
dataService.subscribe(dataHandler, subId, consumerGroup);
}
}
备注
在以上示例中, host 和 port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。
每个 Topic Partition 数量为2,即同一个订阅Topic最多只支持2个 Consumer Client 同时消费数据。
一个 Consumer 实例只能消费一个 Topic。
数据在 Topic 中的存储时长默认为3天。
消费基础告警数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo1 {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取告警service
IAlertService alertService = eosClient.getAlertService();
// 在启动订阅之前需要创建订阅数据处理函数
IAlertHandler alertHandler = new IAlertHandler (){
@Override
public void alertRead(Alert alert) {
System.out.println(alert);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
alertService.subscribe(alertHandler, subId);
}
}
消费高级告警数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertHandler;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo2 {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取告警service
IAdvancedAlertService alertService = eosClient.getAdvancedAlertService();
// 在启动订阅之前需要创建订阅数据处理函数
IAdvancedAlertHandler alertHandler = new IAdvancedAlertHandler(){
@Override
public void alertRead(Alert alert) {
System.out.println(alert);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
alertService.subscribe(alertHandler, subId);
}
}
批量消费订阅数据示例¶
以下示例仅展示高级告警和实时数据的订阅类型的使用。
高级告警:
getAdvancedAlertService(boolean isBatch)
实时数据:
getDataService(boolean isBatch)
在回调函数中增加:
eosClient. getAdvancedAlertService(true).subscribe(new IAdvancedAlertHandler() { @Override public void advancedAlertReads(List<String> alerts) { System.out.println("get=>" + alerts); } }, subId);
备注
增加的回调方法为带有 s 的函数,并且回调方法中形参的类型为数组。
消费离线数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的资产离线数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;
public class DataServiceDemo {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取离线数据service
IDataService dataService = eosClient.getOfflineDataService();
// 在启动订阅之前需要创建订阅数据处理函数
IDataHandler dataHandler = new IDataHandler(){
public void dataRead(StreamMessage message) {
System.out.println(message);
}
};
// 调用subscribe函数创建订阅连接,调用后订阅连接被创建
dataService.subscribe(dataHandler, subId);
}
}
消费事件数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的资产事件数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class EventServiceDemo1 {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取事件service
IEventService eventService = eosClient.getEventService();
// 在启动订阅之前需要创建订阅数据处理函数
IEventHandler eventHandler = new IEventHandler (){
@Override
public void eventRead(String event) {
System.out.println(event);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
eventService.subscribe(eventHandler, subId);
}
}
消费设备事件上报数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的设备事件上报数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.report.IReportHandler;
import com.envisioniot.sub.client.report.IReportService;
import com.envisioniot.sub.common.model.report.Report;
public class ReportServiceDemo {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// 根据订阅类型获取相应的service,本示例获取设备事件上报service
IReportService reportService = eosClient.getReportService();
// 在启动订阅之前需要创建订阅数据处理函数
IReportHandler reportHandler = new IReportHandler() {
@Override
public void eventRead(Report report) {
System.out.println(report);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
reportService.subscribe(reportHandler, subId);
}
}
使用 Python SDK¶
数据订阅 Python SDK 的安装和消费订阅数据的代码示例如下:
安装数据订阅 Python SDK¶
数据订阅 Python SDK 支持 Python 2.7 和 Python 3.4 及以上版本。
安装以下依赖模块:
six
google.protobuf
websocket_client
访问数据订阅 Python SDK 的仓库,获取安装代码:https://github.com/EnvisionIot/enos-subscription-service-sdk-python
使用 pip 安装数据订阅 Python SDK:
pip install enos-subscribe
消费实时数据代码示例¶
以下示例为使用 Python SDK 消费订阅的资产实时数据代码示例:
from enos_subscribe import DataClient
if __name__ == '__main__':
client = DataClient(host='sub-host', port='sub-port',
access_key='Your Access Key of this subscription',
access_secret='Your Access Secret of this subscription')
client.subscribe(sub_id='Your subscription Id')
for message in client:
print(message)
备注
在以上示例中, sub-host 和 sub-port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。
数据在 Topic 中的存储时长默认为3天。
消费告警数据代码示例¶
以下示例为使用 Python SDK 消费订阅的告警引擎数据代码示例:
from enos_subscribe import AlertClient
if __name__ == '__main__':
client = AlertClient(host='sub-host', port='sub-port',
access_key='Your Access Key of this subscription',
access_secret='Your Access Secret of this subscription')
client.subscribe(sub_id='Your subscription Id')
for message in client:
print(message)
消费离线数据代码示例¶
以下示例为使用 Python SDK 消费订阅的离线数据代码示例:
from enos_subscribe import OfflineClient
if __name__ == '__main__':
client = OfflineClient(host='sub-host', port='sub-port',
access_key='Your Access Key of this subscription',
access_secret='Your Access Secret of this subscription')
client.subscribe(sub_id='Your subscription Id')
for message in client:
print(message)
消费事件数据代码示例¶
以下示例为使用 Python SDK 消费订阅的事件数据代码:
from enos_subscribe import EventClient
if __name__ == '__main__':
client = EventClient(host='sub-host', port='sub-port',
access_key='Your Access Key of this subscription',
access_secret='Your Access Secret of this subscription')
client.subscribe(sub_id='Your subscription Id')
for message in client:
print(message)
使用 .NET SDK¶
数据订阅 .NET SDK 的安装和消费订阅数据的代码示例如下:
安装数据订阅 .NET SDK¶
数据订阅 .NET SDK 支持 .NET Framework 4.7 及以上版本。
你可通过程序包管理器控制台或使用 SDK 源代码安装最新版的数据订阅 .NET SDK。
通过程序包管理器控制台安装¶
使用以下命令通过程序包管理器控制台安装数据订阅 .NET SDK:
Install-Package enos_subscription -Version 2.4.2
使用源代码安装¶
访问数据订阅 .NET SDK 的仓库,获取安装代码:https://github.com/EnvisionIot/enos-subscription-service-sdk-dotnet
在 Visual Studio 中,将源代码项目添加到解决方案中,并将其添加为项目的引用。
安装依赖模块¶
数据订阅 .NET SDK 需要以下依赖模块:
消费实时数据代码示例¶
以下示例为使用 .NET SDK 消费订阅的资产实时数据代码示例:
using enos_subscription.client;
using (DataClient client = new DataClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
备注
在以上示例中, sub-host 和 sub-port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。
数据在 Topic 中的存储时长默认为3天。
消费告警数据代码示例¶
以下示例为使用 .NET SDK 消费订阅的告警引擎数据代码示例:
using enos_subscription.client;
using (AlertClient client = new AlertClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
消费离线数据代码示例¶
以下示例为使用 .NET SDK 消费订阅的离线数据代码示例:
using enos_subscription.client;
using (OfflineClient client = new OfflineClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
查看订阅任务的运行情况¶
当产生的订阅数据的数据量较大时,你可以查看订阅任务的运行情况,确保订阅数据被及时消费。
登录 EnOS 管理控制台,在订阅任务列表的 操作 一栏中,选择 更多 > 运行情况。
在弹窗中查看产生订阅数据的速率和消费订阅数据的速率,判断消费数据是否有延迟。
备注
在上图示例中,Producer Rates 显示实时数据通道产生订阅数据的速率,Consumer Rates 显示各个 Consumer Group 消费订阅数据的速率。offset 线和数字代表该订阅任务的数据总量;Consumer Group 线和数字代表该 Consumer Group 消费数据的历史和未被消费的数据量。
如果出现消费数据延迟,可使用同一Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的速度。