MQTT Sub


MQTT Sub 节点建立一个 MQTT 客户端,用以订阅第三方 MQTT 服务器。

节点类型

Input。

输入和输出能力

该节点没有入口点,仅有 1 个出口点。输出为客户端从所订阅服务器处检索到的消息,可以是任何格式。


订阅信息的 topic 将包含在 metadata 中。后续可使用表达式 ${metadata.topic} 获取。

节点属性


提供 MQTT 信息

../../_images/mqtt_sub_mqtt.png


从代理配置获取

../../_images/mqtt_sub_server.png


名称

该节点的名称。


配置方式

配置 MQTT 服务器的方式。支持的配置方式如下。

  • 提供 MQTT 信息:提供 MQTT 服务器的相关信息。如果选中此项,则需要填写以下字段。
    • 主机:MQTT 服务器的 URL。
    • 端口:MQTT 服务器的端口。
    • 鉴权:验证该节点发出请求所用的方法。其值如下所述。
      • 匿名:无需鉴权。
      • 用户名/密码:在鉴权请求中传递用户名/密码。如果选中此项,则必须填写以下字段。
        • 用户名:客户端登录服务器所用的用户名。
        • 密码:客户端登录服务器所用的密码。
      • 单向认证:在 MQTT 客户端对从服务器获取的凭证进行鉴权时,使用加密通信。如果选中此项,则需要填写以下字段。
        • 证书:由 MQTT 鉴权的、来自服务器的证书。
      • 双向:在 MQTT 客户端和服务器对彼此的凭证进行鉴权时,使用加密通信。如果选中此项,则需要填写以下字段。
        • 证书:由 MQTT 鉴权的、来自服务器的证书。
        • 客户端证书:由 MQTT 服务器进行鉴权的客户端证书。
        • 客户端证书密钥:需要鉴权的客户端证书密钥。
        • 客户端密码:需要鉴权的客户端密码。
  • 从代理配置获取:从下拉框选择一个代理作为 MQTT 服务器。更多信息,参见 代理配置


Topic

订阅信息的主题。最多可以指定 5 个主题。


QoS

定义订阅 MQTT 服务器消息时可使用的最大消息服务质量等级(Quality of Service,QoS),从低到高分为 0、1、2 三个等级,可以根据实际需求进行配置。默认值是 2。

QoS 0 1 2
服务器(生产者) 仅发送一条消息,不储存消息,也不确认接收。 至少发送一条消息,确认消息接收。如果没有收到应答,将重复发送消息。 仅发送一条消息,发送消息后储存消息,确认接收后将二次确认,再将消息从队列中删除。
客户端(消费者) 可能收到一条消息,或无法收到消息,不返回任何应答。 至少收到一条消息,收到消息后返回应答。 仅收到一条消息,收到消息后返回应答,二次确认后消息传输完成。
优点 传输速度最快,占用资源最少。 性能最优,消息不会丢失。 消息不会丢失,且只会收到一次。
不足 消息容易丢失。 客户端可能会重复收到同一条消息。 传输速度最慢,占用资源最多。
推荐场景
  • 在同一个子网内部的服务间的消息交互,或其他服务器和客户端连接非常稳定的场景。
  • 可以接受消息偶尔丢失。
  • 要求传输性能达到最优。
  • 消息不能丢失,但可以接受处理重复的消息。
  • 消息不能丢失,且客户端不能处理重复的消息。
  • 要求完整收到消息,不介意延时。


并发数

并行处理来自 MQTT 服务器的消息数量。启用并发数可以提升消息处理速度,但可能打乱消息的顺序。建议在对消息顺序无要求,且追求最佳性能时使用。数值范围为 1-4,默认值为 1。你可以通过节点日志中的 运行时长 查看单位时间内处理的消息数量,以此判断消息处理速度是否提升。关于运行时长的更多信息,参见 节点日志


并发数受当前流的 单实例运行资源数 影响。关于流的单实例运行资源配置,参见 运行资源。为达到最佳性能,推荐的并发数与流的运行实例数关系如下:

  • 当 1 ≤ 单实例运行资源数 ≤ 2 时,并发数为 1。
  • 当 2 < 单实例运行资源数 ≤ 4 时,并发数为 2。
  • 当 4 < 单实例运行资源数 ≤ 6 时,并发数为 3。
  • 当 6 < 单实例运行资源数 ≤ 8 时,并发数为 4。


清除先前会话

开启后,每当客户端重新连接到服务器时,将清除客户端和服务器之间的先前会话。


描述

该节点的描述。


测试连接

你可以选择 测试连接 按钮来测试 MQTT 连接。

使用限制

  • 连接数:1
  • 最大 topic 数:5
  • 并发数范围: 1-4
  • 由于 MQTT 服务器并非 EnOS 管理,EnOS 不保证每次连接都能成功,连接问题可能是由于服务器停机或其他 EnOS 无法控制的因素造成。

示例

输入示例

JSON 对象作为输入的示例:

{
   "externalId": "externalId",
   "timestamp": 24214324324,
   "measurepoints": {
     "speed": 32,
     "heat": 40
   }
}

输出示例

从 MQTT 服务器检索到的 JSON 对象示例。其 topic 信息存储在 metadata 中。

{
    "MetaData":{
    "CamelMqttTopic": "officeTemp",
    "topic": "officeTemp",
    "CamelMqttQoS": "2"
    },
    "Body":{
        "externalId": "externalId",
        "timestamp": 24214324324,
        "measurepoints": {
            "speed": 32,
            "heat": 40
        }
    }
}

相关节点