Consuming Subscribed Data for EnOS Cloud Users


After the data subscription job starts running, you can use the data subscription SDK to develop applications and consume the subscribed data. This topic shows how to install the data subscription SDK and provides code samples for consuming the subscribed data. For more information on EnOS SDKs, see EnOS SDKs & Tools.

EnOS Cloud supports the following SDKs to subscribe data:

  • Java SDK

  • Python SDK

  • .NET SDK

For details about EnOS SDK and download addresses, see EnOS SDKs and tools

For Java SDK

The steps for installing the Java SDK and consuming the subscribed data are as follows.

Installing Data Subscription SDK for Java

Get the Maven dependency information of the data subscription SDK and add it to your development project.

  1. Open the Maven repository of the SDK at Maven dependency for EnOS Cloud.

  2. Open your development environment, and add the maven dependency for the SDK in your Java project. See the following example.

    <dependency>
        <groupId>com.envisioniot</groupId>
        <artifactId>subscription-client</artifactId>
        <version>5.0.2</version>
    </dependency>
    <dependency>
        <groupId>com.envisioniot</groupId>
        <artifactId>enos-subscribe-impl</artifactId>
        <version>5.0.2</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
    

Code Sample for Consuming the Subscribed Real-time Data

The following code sample is for consuming the subscribed asset real-time data with a specified consumer group. In case of huge data volume, you can run 2 consumer clients of the same consumer group to improve the data consumption efficiency.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        // Consumer group name
        String consumerGroup = "consumer_group";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the real-time data service
        IDataService dataService = eosClient.getDataService();

        // Create the data handler function
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // Establish connection with subscription ID and consumer group
        dataService.subscribe(dataHandler, subId, consumerGroup);
    }
}

Note

  • The host and port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in to the EnOS Management Console and select Help > Environment Information.

  • Each subscription topic has 2 partitions. That is, each topic allows at most 2 data consumers at the same time.

  • Currently, a data consumer can consume 1 topic only.

  • By default, data is stored in the topic for 3 days.

Code Sample for Consuming the Subscribed Basic Alert Data

The following code sample is for consuming the subscribed asset alert data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the alert data service
        IAlertService alertService = eosClient.getAlertService();

        // Create the data handler function
        IAlertHandler alertHandler = new IAlertHandler (){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // Establish connection with subscription ID
        alertService.subscribe(alertHandler, subId);
    }
}

Code Sample for Consuming the Subscribed Advanced Alert Data

The following code sample is for consuming the subscribed asset alert data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertHandler;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo2 {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the alert data service
        IAdvancedAlertService alertService = eosClient.getAdvancedAlertService();

        // Create the data handler function
        IAdvancedAlertHandler alertHandler = new IAdvancedAlertHandler(){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // Establish connection with subscription ID
        alertService.subscribe(alertHandler, subId);
    }
}

Code Sample for Batch Consuming the Subscribed Data

The following code sample is for the use of advanced alerts and real-time data subscription types only.

  • Advanced Alert: getAdvancedAlertService(boolean isBatch)

  • Real-time Data:getDataService(boolean isBatch)

  • The following is to be added to the callback function.

    eosClient. getAdvancedAlertService(true).subscribe(new IAdvancedAlertHandler() {
        @Override
        public void advancedAlertReads(List<String> alerts) {
            System.out.println("get=>" + alerts);
        }
    }, subId);
    

Note

The added callback method is a function with s, and the type of the formal parameter in the callback method is an array.

Code Sample for Consuming the Subscribed Offline Data

The following code sample is for consuming the subscribed asset offline data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the real-time data service
        IDataService dataService = eosClient.getOfflineDataService();

        // Create the data handler function
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // Establish connection with subscription ID
        dataService.subscribe(dataHandler, subId);
    }
}

Code Sample for Consuming the Subscribed Event Data

The following code sample is for consuming the subscribed asset event data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the event data service
        IEventService eventService = eosClient.getEventService();

        // Create the data handler function
        IEventHandler eventHandler = new IEventHandler (){
            @Override
            public void eventRead(String event) {
                System.out.println(event);
            }
        };

        // Establish connection with subscription ID
        eventService.subscribe(eventHandler, subId);
    }
}

Code Sample for Consuming the Subscribed Device Event Report Data

The following code sample is for consuming the subscribed device event report data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.report.IReportHandler;
import com.envisioniot.sub.client.report.IReportService;
import com.envisioniot.sub.common.model.report.Report;

public class ReportServiceDemo {

    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // Get the device event report data service
        IReportService reportService = eosClient.getReportService();

        // Create the data handler function
        IReportHandler reportHandler = new IReportHandler() {
            @Override
            public void eventRead(Report report) {
                System.out.println(report);
            }
        };

        // Establish connection with subscription ID
        reportService.subscribe(reportHandler, subId);
    }
}

For Python SDK

The steps for installing the Python SDK and consuming the subscribed data are as follows.

Installing Data Subscription SDK for Python

The Data Subscription SDK for Python supports Python 2.7, Python 3.4, and newer versions.

  1. Install the following dependency modules.

    • six

    • google.protobuf

    • websocket_client

  2. Open the GitHub repository of the SDK to download the installation code: https://github.com/EnvisionIot/enos-subscription-service-sdk-python

  3. (Optional) Install the Data Subscription SDK for Python with pip.

pip install enos-subscribe

Code Sample for Consuming the Subscribed Real-time Data

The following code sample is for consuming the subscribed asset real-time data with the Data Subscription SDK for Python.

if __name__ == '__main__':
    client = DataClient(host='Host of the subscription server', port='9001',
                        access_key='Access key of your APP',
                        access_secret='Secret key of your APP')

    client.subscribe(sub_id='ID of the subscription job')

    for message in client:
        print(message)

Note

  • The host and port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in EnOS Management Console and select Help > Environment Information.

  • By default, data is stored in the Kafka topic for 3 days.

Code Sample for Consuming the Subscribed Alert Data

The following code sample is for consuming the subscribed asset alert data with the Data Subscription SDK for Python.

if __name__ == '__main__':
    client = AlertClient(host='Host of the subscription server', port='9001',
                        access_key='Access key of your APP',
                        access_secret='Secret key of your APP')

    client.subscribe(sub_id='ID of the subscription job')

    for message in client:
        print(message)

Code Sample for Consuming the Subscribed Offline Data

The following code sample is for consuming the subscribed asset offline data with the Data Subscription SDK for Python.

if __name__ == '__main__':
    client = OfflineDataClient(host='Host of the subscription server', port='9001',
                        access_key='Access key of your APP',
                        access_secret='Secret key of your APP')

    client.subscribe(sub_id='ID of the subscription job')

    for message in client:
        print(message)

Code Sample for Consuming the Subscribed Event Data

The following code sample is for consuming the subscribed asset event data with the Data Subscription SDK for Python.

from enos_subscribe import EventClient

if __name__ == '__main__':
    client = EventClient(host='sub-host', port='sub-port',
                        access_key='Your Access Key of this subscription',
                        access_secret='Your Access Secret of this subscription')

    client.subscribe(sub_id='Your subscription Id')

    for message in client:
        print(message)

For .NET SDK

The steps for installing the .NET SDK and consuming the subscribed data are as follows.

Installing Data Subscription SDK for .NET

The Data Subscription SDK for .NET supports .NET Framework 4.7, and newer versions.


You can install the SDK from Package Manager Console, or build from source.

Installing from Package Manager Console

The latest version of EnOS Data Subscription SDK for .NET is available in the Package Manager Console and can be installed using:

Install-Package enos_subscription -Version 2.4.2

Installing from Source

  1. Open the GitHub repository of the SDK to download the installation code: https://github.com/EnvisionIot/enos-subscription-service-sdk-dotnet

  2. In Visual Studio, add the source code project into your solution and add as reference to your project.

Installing Dependency Modules

The EnOS Data Subscription SDK for .NET has the following dependency modules:

Code Sample for Consuming the Subscribed Real-time Data

The following code sample is for consuming the subscribed asset real-time data with the Data Subscription SDK for .NET.

using enos_subscription.client;

using (DataClient client = new DataClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

Note

  • The sub-host and sub-port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in EnOS Management Console and select Help > Environment Information.

  • By default, data is stored in the Kafka topic for 3 days.

Code Sample for Consuming the Subscribed Alert Data

The following code sample is for consuming the subscribed asset alert data with the Data Subscription SDK for .NET.

using enos_subscription.client;

using (AlertClient client = new AlertClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

Code Sample for Consuming the Subscribed Offline Data

The following code sample is for consuming the subscribed asset offline data with the Data Subscription SDK for .NET.

using enos_subscription.client;

using (OfflineClient client = new OfflineClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

Monitoring the Running Statistics of Data Subscription Jobs

When a huge volume of data is being subscribed to, you can check the running statistics of the data subscription job to ensure that the subscribed data is consumed in time.

  1. Log in to the EnOS Management Console and open the Data Subscription page. From the Operations column of the data subscription job list, select More (..) > Running Statistics.

  2. In the pop-up window, you can view the producer rates and consumer rates of the data subscription job, and see whether there is any delay of data consumption (by comparing the data producer rates with the data consumer rates).

    _images/subscription_statistics.png

    Note

    In the above example, Producer Rates shows the speed of data production from the real-time data channel, and Consumer Rates shows the speed of data consumption by each consumer group. The offset line and number indicate the total volume of subscribed data by the subscription job. The consumer group lines and number indicate the volume of subscribed data that has not been consumed by the consumer group.


If there is a delay in data consumption, you can run 2 consumer clients of the same consumer group to improve the data consumption efficiency.