Consuming Subscribed Data for EnOS Cloud Users¶
After the data subscription job starts running, you can use the data subscription SDK to develop applications and consume the subscribed data. This topic shows how to install the data subscription SDK and provides code samples for consuming the subscribed data. For more information on EnOS SDKs, see EnOS SDKs & Tools.
EnOS Cloud supports the following SDKs to subscribe data:
Java SDK
Python SDK
.NET SDK
For details about EnOS SDK and download addresses, see EnOS SDKs and tools。
For Java SDK¶
The steps for installing the Java SDK and consuming the subscribed data are as follows.
Installing Data Subscription SDK for Java¶
Get the Maven dependency information of the data subscription SDK and add it to your development project.
Open the Maven repository of the SDK at Maven dependency for EnOS Cloud.
Open your development environment, and add the maven dependency for the SDK in your Java project. See the following example.
<dependency> <groupId>com.envisioniot</groupId> <artifactId>subscription-client</artifactId> <version>5.0.2</version> </dependency> <dependency> <groupId>com.envisioniot</groupId> <artifactId>enos-subscribe-impl</artifactId> <version>5.0.2</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
Code Sample for Consuming the Subscribed Real-time Data¶
The following code sample is for consuming the subscribed asset real-time data with a specified consumer group. In case of huge data volume, you can run 2 consumer clients of the same consumer group to improve the data consumption efficiency.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;
public class DataServiceDemo {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
// Consumer group name
String consumerGroup = "consumer_group";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the real-time data service
IDataService dataService = eosClient.getDataService();
// Create the data handler function
IDataHandler dataHandler = new IDataHandler(){
public void dataRead(StreamMessage message) {
System.out.println(message);
}
};
// Establish connection with subscription ID and consumer group
dataService.subscribe(dataHandler, subId, consumerGroup);
}
}
Note
The host and port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in to the EnOS Management Console and select Help > Environment Information.
Each subscription topic has 2 partitions. That is, each topic allows at most 2 data consumers at the same time.
Currently, a data consumer can consume 1 topic only.
By default, data is stored in the topic for 3 days.
Code Sample for Consuming the Subscribed Basic Alert Data¶
The following code sample is for consuming the subscribed asset alert data with the default consumer group.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo1 {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the alert data service
IAlertService alertService = eosClient.getAlertService();
// Create the data handler function
IAlertHandler alertHandler = new IAlertHandler (){
@Override
public void alertRead(Alert alert) {
System.out.println(alert);
}
};
// Establish connection with subscription ID
alertService.subscribe(alertHandler, subId);
}
}
Code Sample for Consuming the Subscribed Advanced Alert Data¶
The following code sample is for consuming the subscribed asset alert data with the default consumer group.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertHandler;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo2 {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the alert data service
IAdvancedAlertService alertService = eosClient.getAdvancedAlertService();
// Create the data handler function
IAdvancedAlertHandler alertHandler = new IAdvancedAlertHandler(){
@Override
public void alertRead(Alert alert) {
System.out.println(alert);
}
};
// Establish connection with subscription ID
alertService.subscribe(alertHandler, subId);
}
}
Code Sample for Batch Consuming the Subscribed Data¶
The following code sample is for the use of advanced alerts and real-time data subscription types only.
Advanced Alert:
getAdvancedAlertService(boolean isBatch)
Real-time Data:
getDataService(boolean isBatch)
The following is to be added to the callback function.
eosClient. getAdvancedAlertService(true).subscribe(new IAdvancedAlertHandler() { @Override public void advancedAlertReads(List<String> alerts) { System.out.println("get=>" + alerts); } }, subId);
Note
The added callback method is a function with s, and the type of the formal parameter in the callback method is an array.
Code Sample for Consuming the Subscribed Offline Data¶
The following code sample is for consuming the subscribed asset offline data with the default consumer group.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;
public class DataServiceDemo {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the real-time data service
IDataService dataService = eosClient.getOfflineDataService();
// Create the data handler function
IDataHandler dataHandler = new IDataHandler(){
public void dataRead(StreamMessage message) {
System.out.println(message);
}
};
// Establish connection with subscription ID
dataService.subscribe(dataHandler, subId);
}
}
Code Sample for Consuming the Subscribed Event Data¶
The following code sample is for consuming the subscribed asset event data with the default consumer group.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo1 {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the event data service
IEventService eventService = eosClient.getEventService();
// Create the data handler function
IEventHandler eventHandler = new IEventHandler (){
@Override
public void eventRead(String event) {
System.out.println(event);
}
};
// Establish connection with subscription ID
eventService.subscribe(eventHandler, subId);
}
}
Code Sample for Consuming the Subscribed Device Event Report Data¶
The following code sample is for consuming the subscribed device event report data with the default consumer group.
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.report.IReportHandler;
import com.envisioniot.sub.client.report.IReportService;
import com.envisioniot.sub.common.model.report.Report;
public class ReportServiceDemo {
public static void main(String[] args) throws Exception {
// Host of the data subscription service
String host = "subscription_server";
// Port of the data subscription service
int port = 9001;
// APP authentication (Generated when registering your APP)
String accessKey = "access_key";
// APP authentication
String secretKey = "secret_key";
// Subscription ID
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey);
// Get the device event report data service
IReportService reportService = eosClient.getReportService();
// Create the data handler function
IReportHandler reportHandler = new IReportHandler() {
@Override
public void eventRead(Report report) {
System.out.println(report);
}
};
// Establish connection with subscription ID
reportService.subscribe(reportHandler, subId);
}
}
For Python SDK¶
The steps for installing the Python SDK and consuming the subscribed data are as follows.
Installing Data Subscription SDK for Python¶
The Data Subscription SDK for Python supports Python 2.7, Python 3.4, and newer versions.
Install the following dependency modules.
six
google.protobuf
websocket_client
Open the GitHub repository of the SDK to download the installation code: https://github.com/EnvisionIot/enos-subscription-service-sdk-python
(Optional) Install the Data Subscription SDK for Python with pip.
pip install enos-subscribe
Code Sample for Consuming the Subscribed Real-time Data¶
The following code sample is for consuming the subscribed asset real-time data with the Data Subscription SDK for Python.
if __name__ == '__main__':
client = DataClient(host='Host of the subscription server', port='9001',
access_key='Access key of your APP',
access_secret='Secret key of your APP')
client.subscribe(sub_id='ID of the subscription job')
for message in client:
print(message)
Note
The host and port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in EnOS Management Console and select Help > Environment Information.
By default, data is stored in the Kafka topic for 3 days.
Code Sample for Consuming the Subscribed Alert Data¶
The following code sample is for consuming the subscribed asset alert data with the Data Subscription SDK for Python.
if __name__ == '__main__':
client = AlertClient(host='Host of the subscription server', port='9001',
access_key='Access key of your APP',
access_secret='Secret key of your APP')
client.subscribe(sub_id='ID of the subscription job')
for message in client:
print(message)
Code Sample for Consuming the Subscribed Offline Data¶
The following code sample is for consuming the subscribed asset offline data with the Data Subscription SDK for Python.
if __name__ == '__main__':
client = OfflineDataClient(host='Host of the subscription server', port='9001',
access_key='Access key of your APP',
access_secret='Secret key of your APP')
client.subscribe(sub_id='ID of the subscription job')
for message in client:
print(message)
Code Sample for Consuming the Subscribed Event Data¶
The following code sample is for consuming the subscribed asset event data with the Data Subscription SDK for Python.
from enos_subscribe import EventClient
if __name__ == '__main__':
client = EventClient(host='sub-host', port='sub-port',
access_key='Your Access Key of this subscription',
access_secret='Your Access Secret of this subscription')
client.subscribe(sub_id='Your subscription Id')
for message in client:
print(message)
For .NET SDK¶
The steps for installing the .NET SDK and consuming the subscribed data are as follows.
Installing Data Subscription SDK for .NET¶
The Data Subscription SDK for .NET supports .NET Framework 4.7, and newer versions.
You can install the SDK from Package Manager Console, or build from source.
Installing from Package Manager Console¶
The latest version of EnOS Data Subscription SDK for .NET is available in the Package Manager Console and can be installed using:
Install-Package enos_subscription -Version 2.4.2
Installing from Source¶
Open the GitHub repository of the SDK to download the installation code: https://github.com/EnvisionIot/enos-subscription-service-sdk-dotnet
In Visual Studio, add the source code project into your solution and add as reference to your project.
Installing Dependency Modules¶
The EnOS Data Subscription SDK for .NET has the following dependency modules:
Code Sample for Consuming the Subscribed Real-time Data¶
The following code sample is for consuming the subscribed asset real-time data with the Data Subscription SDK for .NET.
using enos_subscription.client;
using (DataClient client = new DataClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
Note
The sub-host and sub-port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in EnOS Management Console and select Help > Environment Information.
By default, data is stored in the Kafka topic for 3 days.
Code Sample for Consuming the Subscribed Alert Data¶
The following code sample is for consuming the subscribed asset alert data with the Data Subscription SDK for .NET.
using enos_subscription.client;
using (AlertClient client = new AlertClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
Code Sample for Consuming the Subscribed Offline Data¶
The following code sample is for consuming the subscribed asset offline data with the Data Subscription SDK for .NET.
using enos_subscription.client;
using (OfflineClient client = new OfflineClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {
client.subscribe("Your subscription Id", "Your consumer group");
foreach (var message in client.GetMessages())
{
//do something with the message
}
}
Monitoring the Running Statistics of Data Subscription Jobs¶
When a huge volume of data is being subscribed to, you can check the running statistics of the data subscription job to ensure that the subscribed data is consumed in time.
Log in to the EnOS Management Console and open the Data Subscription page. From the Operations column of the data subscription job list, select More (..) > Running Statistics.
In the pop-up window, you can view the producer rates and consumer rates of the data subscription job, and see whether there is any delay of data consumption (by comparing the data producer rates with the data consumer rates).
Note
In the above example, Producer Rates shows the speed of data production from the real-time data channel, and Consumer Rates shows the speed of data consumption by each consumer group. The offset line and number indicate the total volume of subscribed data by the subscription job. The consumer group lines and number indicate the volume of subscribed data that has not been consumed by the consumer group.
If there is a delay in data consumption, you can run 2 consumer clients of the same consumer group to improve the data consumption efficiency.