单元 2: 将个人电脑连接至 EnOS 并采集数据


在 EnOS 应用门户上完成 PC 的设备建模和设备注册之后,你现在可以使用 EnOS Java SDK for MQTT 进行编程,配置开发环境、编写连接代码、采集 PC 数据并上送至云端。


有关使用 EnOS Java SDK for MQTT 的详细信息,参阅 GitHub 上的 Readme 文件。

步骤 1:配置开发环境


设置 Java 开发环境,并添加必要的依赖项 Java SE 8 和 Maven 3,用于支持 EnOS Java SDK for MQTT 和系统信息采集。

  1. 安装 Java SE 8(JDK 8),下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

  2. 安装 Maven 3,下载地址:http://maven.apache.org/download.cgi

  3. 安装开发环境,如 IntelliJ IDEA,下载地址:https://www.jetbrains.com/idea/download/

  4. 在已创建项目的 pom.xml 中,添加 EnOS Java SDK for MQTT 的依赖项:

    <dependency>
        <groupId>com.enos-iot</groupId>
        <artifactId>enos-mqtt</artifactId>
        <version>2.5.2</version>
    </dependency>
    


  5. 在项目的主 pom.xml 文件中,如下所示添加 oshi 依赖项,用于采集计算机操作系统和硬件信息:

    <dependency>
        <groupId>com.github.oshi</groupId>
        <artifactId>oshi-core</artifactId>
        <version>3.13.2</version>
    </dependency>
    

步骤 2:编写设备连接代码


通过 EnOS Java SDK for MQTT 编写代码,将 PC 连接到 EnOS 云端。

  1. 创建一个新的 Java 类(例如 Sample.java),并添加以下必要的 import 语句:

    import com.enosiot.enos.iot_mqtt_sdk.core.ConnCallback;
    import com.enosiot.enos.iot_mqtt_sdk.core.MqttClient;
    import com.enosiot.enos.iot_mqtt_sdk.message.upstream.tsl.*;
    import oshi.SystemInfo;
    import oshi.hardware.HardwareAbstractionLayer;
    import oshi.software.os.OperatingSystem;
    import java.util.HashMap;
    import java.util.Map;
    
  2. 声明将在程序中使用的变量,替换以下占位符:

    private static final String uri = "tcp://{address}:11883"; // {address} 替换为实际 MQTT Broker 地址
    private static final String productKey = "your_product_key";
    private static final String deviceKey = "your_device_key";
    private static final String deviceSecret = "your_device_secret";
    private static MqttClient client;
    

    替换说明:

    • {address}{port} 替换为 EnOS 的 MQTT Broker 地址和端口。登录 EnOS 应用门户的开发控制台,点击右上角 帮助 > 环境信息 获取 MQTT Broker 地址和端口。

    • 推荐在测试或教学场景中使用 TCP 端口 11883,无需额外配置,但安全性较低。如果需要使用安全性更高的 SSL TCP 端口(18883),请确保你所在的 EnOS 环境启用了 SSL,并配置客户端的 SSL 证书,如有需要可联系系统管理员。

    • productKeydeviceKeydeviceSecret 是在 单元 1 中注册 PC 时生成的设备三元组。


  3. 编写 connect() 函数以初始化设备连接,并确保连接成功后再继续执行:

    public static void connect() throws Exception {
        System.out.println("Starting connection to EnOS Cloud...");
        try {
            client = new MqttClient(uri, productKey, deviceKey, deviceSecret);
            client.getProfile().setConnectionTimeout(60).setAutoReconnect(true);
            client.connect(new ConnCallback() {
                @Override
                public void connectComplete(boolean reconnect) {
                    System.out.println("Connection successful" + (reconnect ? " (reconnected)" : ""));
                }
    
                @Override
                public void connectLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }
    
                @Override
                public void connectFailed(Throwable cause) {
                    System.out.println("Connection failed: " + cause.getMessage());
                }
            });
            // 等待连接成功,最多等待 5 秒
            int maxWaitTime = 5000; // 毫秒
            long startTime = System.currentTimeMillis();
            while (!client.isConnected() && (System.currentTimeMillis() - startTime) < maxWaitTime) {
                Thread.sleep(100);
            }
            if (!client.isConnected()) {
                throw new Exception("Failed to connect to EnOS Cloud within timeout period.");
            }
            System.out.println("Connection status: " + client.isConnected());
        } catch (Throwable e) {
            System.err.println("Connection error: " + e.getMessage());
            throw e;
        }
    }
    

步骤 3:采集数据并将数据上传到 EnOS 云端


采集 PC 的系统和硬件数据,并将数据作为属性和测点上传到 EnOS 云端:

  1. 编写 collectDeviceInfo() 函数采集 PC 的系统和硬件数据。参考以下代码示例:

    public static Map<String, Object> collectDeviceInfo() {
         SystemInfo si = new SystemInfo();
         HardwareAbstractionLayer hal = si.getHardware();
         OperatingSystem os = si.getOperatingSystem();
    
         Map<String, Object> data = new HashMap<>();
         data.put("system", os.toString());
         data.put("model", hal.getComputerSystem().getManufacturer() + " " + hal.getComputerSystem().getModel());
         data.put("cpu_core", hal.getProcessor().getLogicalProcessorCount());
         data.put("cpu_used", hal.getProcessor().getSystemCpuLoad());
         data.put("mem_total", hal.getMemory().getTotal());
         data.put("mem_used", hal.getMemory().getAvailable()); // 注意:mem_used 表示可用内存
         data.put("cpu_used_average", hal.getProcessor().getSystemLoadAverage());
         data.put("cpu_temperature", hal.getSensors().getCpuTemperature());
    
         return data;
     }
    


  2. 编写 updateAttribute() 函数,使用已采集的系统和硬件数据更新 EnOS 应用门户上 PC 设备的属性。参考以下代码示例:

    public static void updateAttribute() {
         Map<String, Object> deviceInfo = collectDeviceInfo();
         System.out.println("Computer info: " + deviceInfo);
         AttributeUpdateRequest request = AttributeUpdateRequest.builder()
                 .setQos(1)
                 .addAttribute("system", deviceInfo.get("system"))
                 .addAttribute("model", deviceInfo.get("model"))
                 .addAttribute("cpu_core", deviceInfo.get("cpu_core"))
                 .addAttribute("mem_total", deviceInfo.get("mem_total"))
                 .build();
         System.out.println(">>> Update Attribute: " + request);
    
         try {
             AttributeUpdateResponse resp = client.publish(request);
             System.out.println("<-- Response: " + resp);
         } catch (Exception e) {
             System.err.println("Failed to update attributes: " + e.getMessage());
             e.printStackTrace();
         }
     }
    


  3. 编写 postMeasurepoint 函数,将采集的 CPU 负载和内存使用数据作为测点上传到 EnOS 云端。参考以下代码示例:

    public static void postMeasurepoint(Map<String, Object> systemInfo) {
         MeasurepointPostRequest request = MeasurepointPostRequest.builder()
                 .setQos(0)
                 .addMeasurePoint("cpu_used", (Double) systemInfo.get("cpu_used"))
                 .addMeasurePoint("mem_used", ((Long) systemInfo.get("mem_used")).doubleValue()) // 转换为 double 类型
                 .build();
         System.out.println(">>> Post Measurepoint: " + request);
    
         try {
             MeasurepointPostResponse resp = client.publish(request);
             System.out.println("<-- Response: " + resp);
         } catch (Exception e) {
             System.err.println("Failed to post measurepoint: " + e.getMessage());
             e.printStackTrace();
         }
     }
    

步骤 4:运行程序并检查结果


编译并运行程序,验证设备连接和数据上传是否成功,并在 EnOS 应用门户上检查结果:

  1. 整合所有代码,创建一个完整的 Sample.java 文件:

    import com.enosiot.enos.iot_mqtt_sdk.core.ConnCallback;
    import com.enosiot.enos.iot_mqtt_sdk.core.MqttClient;
    import com.enosiot.enos.iot_mqtt_sdk.message.upstream.tsl.*;
    import oshi.SystemInfo;
    import oshi.hardware.HardwareAbstractionLayer;
    import oshi.software.os.OperatingSystem;
    import java.util.HashMap;
    import java.util.Map;
    
    public class Sample {
        private static final String uri = "tcp://{address}:11883"; // 替换为实际 MQTT Broker 地址
        private static final String productKey = "your_product_key";
        private static final String deviceKey = "your_device_key";
        private static final String deviceSecret = "your_device_secret";
        private static MqttClient client;
    
        public static void main(String[] args) throws Exception {
            connect();
            updateAttribute();
            postMeasurepoint(collectDeviceInfo());
        }
    
        public static void connect() throws Exception {
            System.out.println("Starting connection to EnOS Cloud...");
            try {
                client = new MqttClient(uri, productKey, deviceKey, deviceSecret);
                client.getProfile().setConnectionTimeout(60).setAutoReconnect(true);
                client.connect(new ConnCallback() {
                    @Override
                    public void connectComplete(boolean reconnect) {
                        System.out.println("Connection successful" + (reconnect ? " (reconnected)" : ""));
                    }
    
                    @Override
                    public void connectLost(Throwable cause) {
                        System.out.println("Connection lost: " + cause.getMessage());
                    }
    
                    @Override
                    public void connectFailed(Throwable cause) {
                        System.out.println("Connection failed: " + cause.getMessage());
                    }
                });
                // 等待连接成功,最多等待 5 秒
                int maxWaitTime = 5000; // 毫秒
                long startTime = System.currentTimeMillis();
                while (!client.isConnected() && (System.currentTimeMillis() - startTime) < maxWaitTime) {
                    Thread.sleep(100);
                }
                if (!client.isConnected()) {
                    throw new Exception("Failed to connect to EnOS Cloud within timeout period.");
                }
                System.out.println("Connection status: " + client.isConnected());
            } catch (Throwable e) {
                System.err.println("Connection error: " + e.getMessage());
                throw e;
            }
        }
    
        public static Map<String, Object> collectDeviceInfo() {
            SystemInfo si = new SystemInfo();
            HardwareAbstractionLayer hal = si.getHardware();
            OperatingSystem os = si.getOperatingSystem();
    
            Map<String, Object> data = new HashMap<>();
            data.put("system", os.toString());
            data.put("model", hal.getComputerSystem().getManufacturer() + " " + hal.getComputerSystem().getModel());
            data.put("cpu_core", hal.getProcessor().getLogicalProcessorCount());
            data.put("cpu_used", hal.getProcessor().getSystemCpuLoad());
            data.put("mem_total", hal.getMemory().getTotal());
            data.put("mem_used", hal.getMemory().getAvailable()); // 表示可用内存
            data.put("cpu_used_average", hal.getProcessor().getSystemLoadAverage());
            data.put("cpu_temperature", hal.getSensors().getCpuTemperature());
    
            return data;
        }
    
        public static void updateAttribute() {
            Map<String, Object> deviceInfo = collectDeviceInfo();
            System.out.println("Computer info: " + deviceInfo);
            AttributeUpdateRequest request = AttributeUpdateRequest.builder()
                    .setQos(1)
                    .addAttribute("system", deviceInfo.get("system"))
                    .addAttribute("model", deviceInfo.get("model"))
                    .addAttribute("cpu_core", deviceInfo.get("cpu_core"))
                    .addAttribute("mem_total", deviceInfo.get("mem_total"))
                    .build();
            System.out.println(">>> Update Attribute: " + request);
    
            try {
                AttributeUpdateResponse resp = client.publish(request);
                System.out.println("<-- Response: " + resp);
            } catch (Exception e) {
                System.err.println("Failed to update attributes: " + e.getMessage());
                e.printStackTrace();
            }
        }
    
        public static void postMeasurepoint(Map<String, Object> systemInfo) {
            MeasurepointPostRequest request = MeasurepointPostRequest.builder()
                    .setQos(0)
                    .addMeasurePoint("cpu_used", (Double) systemInfo.get("cpu_used"))
                    .addMeasurePoint("mem_used", ((Long) systemInfo.get("mem_used")).doubleValue())
                    .build();
            System.out.println(">>> Post Measurepoint: " + request);
    
            try {
                MeasurepointPostResponse resp = client.publish(request);
                System.out.println("<-- Response: " + resp);
            } catch (Exception e) {
                System.err.println("Failed to post measurepoint: " + e.getMessage());
                e.printStackTrace();
            }
        }
    }
    


  2. 编译并运行程序。如果连接和数据上传成功,程序将输出类似以下结果:

    Start connect with callback ...
    
    connect result :true
    connect success
    
    Computer info: {mem_used=1401421824, cpu_used_average=-1.0, cpu_temperature=0.0, cpu_used=1.0, system=Microsoft Windows 10 build 17134, cpu_core=4, model=LENOVO 80T9, mem_total=8135401472}
    
    >>> Update Attribute: AnswerableMessageBody{id='null', method='thing.attribute.update', version='null', params={attributes={system=Microsoft Windows 10 build 17134, cpu_core=4, model=LENOVO 80T9, mem_total=8135401472}}}
    
    >>>Post Measurepoint: AnswerableMessageBody{id='null', method='thing.measurepoint.post', version='null', params={measurepoints={mem_used=1314672640, cpu_used=0.4697233989534014}, time=1565841030584}}
    


  3. 登录 EnOS 应用门户的开发者控制台,在 设备管理 > 设备资产 中检查 PC 设备的状态变更。设备的状态为 在线


    ../../_images/device_status1.png


  4. 在设备详情页面的 属性 选项卡下,检查 PC 设备的属性是否更新成功。


    ../../_images/updated_attributes.png


  5. 在设备详情页面上 测点 选项卡下,检查上传的 cpu_usedmem_used 测点数据是否正确展示。


    ../../_images/uploaded_data.png


    注意:如果 mem_used 数据未显示,请确保 EnOS 设备模型中 mem_used 测点的数据类型定义为 double。若仍有问题,检查 postMeasurepoint 函数中的数据类型转换。

下一单元


单元 3:监测 CPU 负载