前面我们已经研究了Kafka,这里我们再学习MQTT协议相关知识,最后你会发现两者结合使用才是精彩绝伦。

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。

为什么 MQTT 是适用于物联网的最佳协议

MQTT 所具有的适用于物联网特定需求的特点和功能,使其成为物联网领域最佳的协议之一。它的主要特点包括:

  1. 轻量级:物联网设备通常在处理能力、内存和能耗方面受到限制。MQTT 开销低、报文小的特点使其非常适合这些设备,因为它消耗更少的资源,即使在有限的能力下也能实现高效的通信。
  2. 可靠:物联网网络常常面临高延迟或连接不稳定的情况。MQTT 支持多种 QoS 等级、会话感知和持久连接,即使在困难的条件下也能保证消息的可靠传递,使其非常适合物联网应用。
  3. 安全通信:安全对于物联网网络至关重要,因为其经常涉及敏感数据的传输。为确保数据在传输过程中的机密性,MQTT 提供传输层安全(TLS)和安全套接层(SSL)加密功能。此外,MQTT 还通过用户名/密码凭证或客户端证书提供身份验证和授权机制,以保护网络及其资源的访问。
  4. 双向通信:MQTT 的发布-订阅模式为设备之间提供了无缝的双向通信方式。客户端既可以向主题发布消息,也可以订阅接收特定主题上的消息,从而实现了物联网生态系统中的高效数据交换,而无需直接将设备耦合在一起。这种模式也简化了新设备的集成,同时保证了系统易于扩展。
  5. 连续、有状态的会话:MQTT 提供了客户端与 Broker 之间保持有状态会话的能力,这使得系统即使在断开连接后也能记住订阅和未传递的消息。此外,客户端还可以在建立连接时指定一个保活间隔,这会促使 Broker 定期检查连接状态。如果连接中断,Broker 会储存未传递的消息(根据 QoS 级别确定),并在客户端重新连接时尝试传递它们。这个特性保证了通信的可靠性,降低了因间断性连接而导致数据丢失的风险。
  6. 大规模物联网设备支持:物联网系统往往涉及大量设备,需要一种能够处理大规模部署的协议。MQTT 的轻量级特性、低带宽消耗和对资源的高效利用使其成为大规模物联网应用的理想选择。通过采用发布-订阅模式,MQTT 实现了发送者和接收者的解耦,从而有效地减少了网络流量和资源使用。此外,协议对不同 QoS 等级的支持使得消息传递可以根据需求进行定制,确保在各种场景下获得最佳的性能表现。
  7. 语言支持:物联网系统包含使用各种编程语言开发的设备和应用。MQTT 具有广泛的语言支持,使其能够轻松与多个平台和技术进行集成,从而实现了物联网生态系统中的无缝通信和互操作性。比如在PHP、Node.js、Python、Golang、Node.js 等编程语言中使用 MQTT。

不同语言的SDK参考:

https://www.emqx.com/zh/mqtt-client-sdk

MQTT典型应用场景

  • 物联网(IoT):MQTT在物联网领域非常流行,因为它可以在低功耗设备之间有效地传输数据。智能家居系统、智能农业、工业自动化等领域都可以使用MQTT实现设备之间的通信和信息共享
  • 能源管理:MQTT可以用于远程监控和管理太阳能系统、智能电网、风能发电等可再生能源设备。通过MQTT,用户可以实时监控设备状态、调整设备参数以提高能源效率
  • 车载系统:在汽车行业,MQTT可以用于实现车辆与车载信息娱乐系统之间的通信。例如,驾驶员可以通过MQTT向车载系统发送导航指令、获取路况信息等
  • 环境监测:MQTT可以用于实时监测环境参数,如空气质量、湿度、温度等。通过MQTT,环境监测设备可以将数据发送到云端,便于用户远程查看和分析
  • 医疗健康:MQTT可以用于远程监护患者,如心电监护仪、血糖仪等。通过MQTT,医疗机构可以实时获取患者的健康数据,为患者提供及时的诊断和治疗建议
  • 无人机:MQTT可用于无人机与地面控制站之间的通信。无人机可以通过MQTT实时报告飞行状态、位置、天气等信息,便于地面控制站进行远程监控和控制
  • 金融服务:MQTT可以用于实时传输金融数据,如股票价格、汇率等。金融机构可以使用MQTT实现跨地区、跨平台的数据同步和实时交易

MQTT 的工作原理

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

常见的MQTT Broker实现有Mosquitto、HiveMQ、NanoMQ、EMQX、VerneMQ等。

  1. HiveMQ:企业级 MQTT Broker,Java语言编写,提供可靠的消息传输和大规模 IoT 应用支持。
  2. EMQX:开源 MQTT Broker,Erlang语言编写,国货,支持高吞吐量和低延迟,适合大规模 IoT 设备连接。它提供了丰富的功能,如多协议支持、WebSocket、SSL/TLS加密等。
  3. Mosquitto:轻量级开源 MQTT Broker,C语言编写,适合小型 IoT 项目或嵌入式设备。
  4. NanoMQ:一个轻量级的 MQTT Broker,专为 IoT 边缘场景设计。纯 C 语言编写,基于 NNG 的异步 I/O 多线程 Actor 模型。
  5. Kafka:虽然Kafka不是基于MQTT协议的,但它是一个分布式流处理平台,可以用于构建实时数据管道和消息队列。Kafka可以与MQTT代理集成,以实现实时数据传输。

MQTT是基于TCP协议实现,基于HTTP的网页应用便无法与之交互。为了解决这个问题,许多MQTT Broker加上了对WebSockets的支持。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:

1
2
3
4
5
6
7
8
9
# topic示例
chat/room/1
sensor/10/temperature
sensor/+/temperature

# MQTT 主题支持以下两种通配符:`+` 和 `#`。
# `+`:表示单层通配符,例如 `a/+` 匹配 `a/x` 或 `a/y`。
# `#`:表示多层通配符,例如 `a/#` 匹配 `a/x`、`a/b/c/d`。
# **注意**:通配符主题只能用于订阅,不能用于发布。

QoS

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  1. QoS 0消息最多传送一次:意思就是给你转发一次就得了,不管你有没收到。如果当前客户端不可用,它将丢失这条消息。
  2. QoS 1消息至少传送一次:也就是说服务器给你重试转发,直到服务器收到客户端的确认消息。确保至少向客户端发送一次信息,不过也可发送多次;在接收数据包时,需要客户端返回确认消息(ACK 包)。这种方式常用于传递确保交付的信息,但开发人员必须确保其系统可以处理重复的数据包。
  3. QoS 2消息只传送一次:服务器保证你肯定能收到一次,而且只有一次。

不同 QoS 的性能有差距么?以 EMQX 为例,在相同的硬件配置下进行点对点通信,通常 QoS 0 与 QoS 1 能够达到的吞吐比较接近,不过 QoS 1 的 CPU 占用会略高于 QoS 0,负载较高时,QoS 1 的消息延迟也会进一步增加。而 QoS 2 能够达到的吞吐一般仅为 QoS 0、1 的一半左右。

参考:https://www.emqx.com/zh/blog/introduction-to-mqtt-qos

保留消息

当 MQTT 客户端订阅新主题时,broker 可以存储一条保留消息,以便在新的订阅者加入时立即发送,这确保订阅者在订阅时能够至少接收到一条消息。

遗嘱消息(LWT)

MQTT 客户端可以向 broker 指定一条 遗嘱消息,在客户端异常断开时自动发送,提供全局的系统通知,告知某个客户端已断开连接。

MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接。可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息。并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

Mosquitto的安装和使用

实现MQTT协议的一款常见的工具是Mosquitto,下面我们来安装并测试。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# centos系统安装并启动服务
yum install mosquitto
systemctl start mosquitto.service 
systemctl status mosquitto.service

# mosquitto --help
mosquitto version 1.6.15
mosquitto is an MQTT v5.0/v3.1.1/v3.1 broker.

# 一种参考配置: /etc/mosquitto/mosquitto.conf
# =================================================================
# General configuration
# =================================================================
#per_listener_settings false
#allow_duplicate_messages false
#allow_zero_length_clientid true
#auto_id_prefix auto-
#check_retain_source true

#max_inflight_bytes 0
#max_inflight_messages 20
#max_keepalive 65535
#max_packet_size 0
#max_queued_bytes 0

#max_queued_messages 100
#memory_limit 0
#message_size_limit 0

#persistent_client_expiration
pid_file /var/run/mosquitto.pid
#queue_qos0_messages false
#retain_available true
#set_tcp_nodelay false
#sys_interval 10
#upgrade_outgoing_qos false
user mosquitto # 安装时就自动创建这个用户
#groupadd mosquitto
#useradd -g mosquitto mosquitto


# =================================================================
# Default listener
# =================================================================
bind_address 10.10.213.13
port 1883
#bind_interface eth0
#http_dir
#max_connections -1
#protocol mqtt
#use_username_as_clientid

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
#cafile
#capath
#certfile
#keyfile
#crlfile
#ciphers DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH
#dhparamfile
#require_certificate false
#tls_version
#use_identity_as_username false
#use_subject_as_username false

# -----------------------------------------------------------------
# Pre-shared-key based SSL/TLS support
# -----------------------------------------------------------------
#psk_hint
#ciphers
#use_identity_as_username false

# =================================================================
# Extra listeners
# =================================================================
# 支持websocket协议访问
listener 9001 10.10.213.13
http_dir /etc/mosquitto/
protocol websockets
#listener
#bind_interface
#http_dir
#max_connections -1
#mount_point
#protocol mqtt
#use_username_as_clientid
#websockets_headers_size

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
#cafile
#capath
#certfile
#keyfile
#ciphers
#crlfile
#dhparamfile
#require_certificate false
#use_identity_as_username false

# -----------------------------------------------------------------
# Pre-shared-key based SSL/TLS support
# -----------------------------------------------------------------
#psk_hint
#ciphers
#use_identity_as_username false

# =================================================================
# Persistence
# =================================================================
#autosave_interval 1800
#autosave_on_changes false
# 启动消息持久化存储
persistence true
persistence_file mosquitto.db
persistence_location /etc/mosquitto/

# =================================================================
# Logging
# =================================================================
log_dest file /etc/mosquitto/log.log
#log_dest syslog
#log_type error
#log_type warning
#log_type notice
#log_type information
#connection_messages true
#log_facility
#log_timestamp true
#log_timestamp_format
#websockets_log_level 0

# =================================================================
# Security
# =================================================================
#clientid_prefixes
# 禁止匿名访问
allow_anonymous false

# -----------------------------------------------------------------
# Default authentication and topic access control
# -----------------------------------------------------------------
#password_file /etc/mosquitto/pwfile # 需要认证账号密码才能访问
#psk_file # 权限认证
#acl_file

# -----------------------------------------------------------------
# External authentication and topic access plugin options
# -----------------------------------------------------------------
auth_plugin /etc/mosquitto/plugin/auth-plug.so

auth_opt_backends http
auth_opt_http_ip 10.10.213.12
auth_opt_http_port 8089
#auth_opt_http_hostname example.org
auth_opt_http_getuser_uri /auth
auth_opt_http_superuser_uri /superuser
auth_opt_http_aclcheck_uri /acl

# If the auth_plugin option above is used, define options to pass to the
# plugin here as described by the plugin instructions. All options named
# using the format auth_opt_* will be passed to the plugin, for example:
# auth_opt_db_host
# auth_opt_db_port
# auth_opt_db_username
# auth_opt_db_password

# =================================================================
# Bridges
# =================================================================
#connection <name>
#address <host>[:<port>] [<host>[:<port>]]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
#bridge_attempt_unsubscribe true
#bridge_protocol_version mqttv311
#cleansession false
#idle_timeout 60
#keepalive_interval 60
#local_clientid
#notifications true
#notification_topic
#remote_clientid
#remote_password
#remote_username
#restart_timeout 5 30
#round_robin false
#start_type automatic
#threshold 10
#try_private true

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
#bridge_cafile
#bridge_capath
#bridge_alpn
#bridge_insecure false
#bridge_certfile
#bridge_keyfile

# -----------------------------------------------------------------
# PSK based SSL/TLS support
# -----------------------------------------------------------------
#bridge_identity
#bridge_psk

# =================================================================
# External config files
# =================================================================
#include_dir

当然我们也可以采用手动运行的方式运行broker

1
2
mosquitto [-c config file] [ -d | --daemon ] [-p port number] [-v]
mosquitto -c /etc/mosquitto/mosquitto.conf -d
  • -c 启动mosquitto可以调整的参数,配置文件
  • -d 表示MQTT mosquitto将在后台运行。
  • -p 代表当前的mosquitto服务实例启动以后,其监听端口号
  • -v 代码调试模式(verbose)

mosquitto_sub订阅消息

1
2
3
4
mosquitto_sub -v -t "topic1"

# 订阅客户端存活连接数
mosquitto_sub -h localhost -t '$SYS/broker/clients/active' -u book -P root
  • -c 禁止’clean session’选项,即如果客户端断开连接,这个订阅仍然保留来接收随后到的QoS为1和2的消息,当改客户端重新连接之后,它将接收到已排在队列中的消息。建议使用此选项时,客户端id选项设为–id
  • -d 开启debug选项
  • -h 说明所连接到的域名,默认是localhost
  • -i 客户端的ID号,如果没有指定,默认是mosquitto_pub_加上客户端的进程id,不能和–id_prefix同时使用。
  • -I 指定客户端ID的前缀,与客户端的进程ID连接组成客户端的ID,不能喝–id同时使用。
  • -k 给代理发送PING命令(目的在于告知代理该客户端连接保持且在正常工作)的间隔时间,默认是60s
  • -p 说明客户端连接到的端口,默认是1883
  • -P 指定密码用于代理认证,使用此选项时必须有有效的用户名。
  • -q 指定消息的服务质量,可以为0,1,2,默认是0.
  • –quiet 如果指定该选项,则不会有任何错误被打印,当然,这排除了无效的用户输入所引起的错误消息。
  • -t 指定订阅的消息主题,允许同时订阅到多个主题
  • -u 指定用户名用于代理认证。
  • -v, 冗长地打印收到的消息。若指定该选项,打印消息时前面会打印主题名——“主题 消息内容”,否则,只打印消息内容
  • –will-payload 如果指定该选项,则万一客户端意外和代理服务器断开,则该消息将被保留在服务端并发送出去,该选项必须同时用–will-topic指定主题。
  • –will-qos 指定Will的服务质量,默认是0.必须和选项 –will-topic同时使用.
  • –will-retain 如果指定该选项,则万一客户端意外断开,已被发送的消息将被当做retained消息。必须和选项 –will-topic同时使用.
  • –will-topic 指定客户端意外断开时,Will消息发送到的主题。

mosquitto_pub发布消息

1
mosquitto_pub -t "topic1" -i "client_id" -m "Hello World"
  • -d 开启debug选项
  • -f 把一个文件的内容做为消息的内容发送。经测试,支持txt文件,不支持doc等其他形式文件。
  • -h 说明所连接到的域名,默认是localhost
  • -i 客户端的ID号,如果没有指定,默认是mosquitto_pub_加上客户端的进程id,不能和–id_prefix同时使用。
  • -I 指定客户端ID的前缀,与客户端的进程ID连接组成客户端的ID,不能和–id同时使用。
  • -l 从总段读取输入发送消息,一行为一条消息,空白行不会被发送。
  • -m 从命令行发送一条消息,-m后面跟发送的消息内容。
  • -n 发送一条空消息。
  • -p 连接的端口号,默认是1883.
  • -P 指定密码用于代理认证,使用此选项时必须有有效的用户名。
  • -q 指定消息的服务质量,可以为0,1,2,默认是0.
  • –quiet:如果指定该选项,则不会有任何错误被打印,当然,这排除了无效的用户输入所引起的错误消息。
  • -r 如果指定该选项,该条消息将被保留做为最后一条收到的消息。下一个订阅消息者将能至少收到该条消息。
  • -s 从标准输入接收传输的消息内容,所有输入做为一条消息发送。
  • -t 指定消息所发布到哪个主题。
  • -u 指定用户名用于代理认证。
  • –will-payload 如果指定该选项,则万一客户端意外和代理服务器断开,则该消息将被保留在服务端并发送出去,该选项必须同时用–will-topic指定主题。
  • –will-qos 指定Will的服务质量,默认是0.必须和选项 –will-topic同时使用.
  • –will-retain 如果指定该选项,则万一客户端意外断开,已被发送的消息将被当做retained消息。必须和选项 –will-topic同时使用.
  • –will-topic 指定客户端意外断开时,Will消息发送到的主题

mosquitto_passwd管理认证账号密码

这里passwordfile: 需要修改的password文件名称,username: 将要add/update/delete的用户名,password:密码。

1
2
3
mosquitto_passwd [-c | -D] passwordfile username
mosquitto_passwd -b passwordfile username password
mosquitto_passwd -U passwordfile
  • -b: 以batch mode运行,这样password可以在命令行中提供。这样做虽然用起来方便,但是有风险,因为password是可见的,命令行的历史记录也可以看到。
  • -c: 创建一个新的password文件,如果文件已经存在,会被新文件替换
  • -D: 从password文件删除一个指定的用户
  • -U: 将一个纯文本的password文件转为hash 密码文件。它不会检查文件是否已经做过hash,也就是说如果文件已经做了hash,它会继续以此文件为基础做一次hash并导致此password文件不可用。

比如我们在broker上使用账号密码认证登录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
touch /etc/mosquitto/pwfile 					# 创建密码文件
mosquitto_passwd /etc/mosquitto/pwfile chende 	# 会提示2次输入密码

# 然后在配置文件中启用账号密码访问,重启服务即可
allow_anonymous false
password_file /etc/mosquitto/pwfile

# PS:或者用插件形式支持外部http接口调用账号密码做认证
auth_plugin /etc/mosquitto/plugin/auth-plug.so

auth_opt_backends http
auth_opt_http_ip 10.10.213.12
auth_opt_http_port 8089
#auth_opt_http_hostname example.org
auth_opt_http_getuser_uri /auth
auth_opt_http_superuser_uri /superuser
auth_opt_http_aclcheck_uri /acl

mosquitto_rr

mosquitto_rr是一个 MQTT 客户端,可用于发布请求消息并等待响应。

MQTT3.1.1 和 MQTT5.0

目前主流的MQTT协议版本主要有两个:MQTT 3.1.1和MQTT 5.0。

  1. MQTT 3.1.1:
    • 这是MQTT协议的一个较早且广泛使用的版本。
    • 它提供了基本的发布/订阅消息模式,支持一对多的消息发布,解除了应用程序的耦合。
    • 基于TCP/IP提供网络连接,同时也有基于UDP的版本,称为MQTT-SN。
    • 支持QoS(服务质量)等级,允许根据消息的重要性设置不同的服务质量等级。
    • 具有小型传输、开销小的特点,协议交换最小化,以降低网络流量。
    • 使用will遗嘱机制来通知客户端异常断线。
    • 基于主题发布/订阅消息,对负载内容屏蔽的消息传输。
    • 支持心跳机制以保持连接状态。
  2. MQTT 5.0:
    • 作为MQTT协议的最新版本,它在3.1.1版本的基础上增加了许多新特性和改进。
    • 提供了更多的会话管理功能,如共享订阅和会话持久化。
    • 增强了认证和授权机制,包括使用TLS加密连接和OAuth 2.0进行认证。
    • 引入了新的QoS级别3,提供了更加可靠的消息传递保证。
    • 增加了消息过期时间、主题别名和响应主题等特性,以优化消息传输和处理。
    • 提供了更好的错误处理和恢复机制,以及更加灵活的连接和断开方式。

MQTT 3.1.1版本已经足够满足大多数物联网应用的需求,并且由于其广泛的兼容性和稳定性而被广泛使用。而MQTT 5.0版本则提供了更多的高级特性和改进,适用于需要更高性能和更复杂功能的场景。在选择使用哪个版本的MQTT协议时,需要根据具体的应用需求和技术环境来做出决策。

MQTT会话

MQTT会话本质上就是一组需要服务端和客户端额外存储的上下文数据,这些数据可以仅持续与网络连接一样长的时间,也可以跨越多个连续的网络连接存在。当客户端与服务端借助这些会话数据恢复通信时,可以让网络中断就像从未发生过一样。

以服务端为例,它需要存储客户端的订阅列表,那么不管当前客户端是否连接,只要会话没有过期,服务端就能够知道哪些消息是被该客户端订阅的,进而为它缓存这些消息。另外,客户端再次连接时也不需要重新发起订阅,这也减少了服务端的性能开销。

MQTT为服务端和客户端分别定义了它们需要存储的会话状态。

对于服务端来说,它需要存储以下内容:

  1. 会话是否存在。
  2. 客户端的订阅列表。
  3. 已发送给客户端,但是还没有完成确认的 QoS 1 和 QoS 2 消息。
  4. 等待传输给客户端的 QoS 0 消息(可选),QoS 1 和 QoS 2 消息。
  5. 从客户端收到的,但是还没有完成确认的 QoS 2 消息。
  6. 遗嘱消息与遗嘱过期间隔。
  7. 会话过期时间。

对于客户端来说,它需要存储以下内容:

  1. 已发送给服务端,但是还没有完成确认的 QoS 1 和 QoS 2 消息。
  2. 从服务端收到的,但是还没有完成确认的 QoS 2 消息。

显而易见的是,让服务端和客户端永久存储这些会话数据,不仅会带来很多额外的存储成本,而且在很多场景中也没有必要。譬如我们只是为了避免网络连接短暂中断导致的消息丢失,那么一般将会话数据设置为在连接断开后保留短暂的几分钟即可。

另外,当客户端与服务端会话状态不一致时,比如客户端设备因为重启导致会话数据丢失,那么它需要在连接时告知服务端丢弃原有的会话并创建一个全新的会话。

针对这两点,MQTT提供了 Clean Start 和 Session Expiry Interval 这两个连接字段来控制会话的生命周期。

Clean Start

Clean Start 位于 CONNECT 报文的 可变报头,客户端在连接时通过这个字段指定是否复用已存在的会话,它只有 0 和 1 两个可取值。

  1. 当 Clean Start 设置为 0,如果服务端存在与客户端连接时指定的 Client ID 关联的会话,那么它必须使用这个会话来恢复通信。如果不存在任何与该 Client ID 关联的会话,则服务端必须创建一个全新的会话。这时,客户端使用的是旧的会话,服务端使用的是全新的会话,两边的会话状态出现了不一致。所以服务端必须将 CONNACK 报文中的 Session Present 字段设置为 0,以让客户端知晓它期望的会话不存在,如果客户端想要继续此网络连接,就必须丢弃它保存的会话状态。
  2. 当 Clean Start 设置为 1,客户端和服务端必须丢弃任何已存在的会话,并开始一个全新的会话。相应地,服务端也会把 CONNACK 报文中的 Session Present 字段设置为 0。

Session Expiry Interval

Session Expiry Interval 同样位于 CONNECT 报文的可变报头,不过它是一个可选的连接属性。它被用来指定会话在网络断开后能够在服务端保留的最长时间,如果到达过期时间但网络连接仍未恢复,服务端就会丢弃对应的会话状态。它有三个典型的值:

  1. 没有指定此属性或者设置为 0,表示会话将在网络连接断开时立即结束。
  2. 设置为一个大于 0 的值,则表示会话将在网络连接断开的多少秒之后过期。
  3. 设置为 0xFFFFFFFF,即 Session Expiry Interval 属性能够设置的最大值时,表示会话永不过期。

每个 MQTT 客户端都可以独立设置自己的 Session Expiry Interval,比如一部分客户端不需要持久会话,一部分客户端只需要会话保留几分钟来避免网络波动带来的影响,而另一部分客户端则可能需要会话保留更长的时间。

MQTT 还允许客户端在断开连接时更新会话过期时间,这主要依靠 DISCONNECT 报文中 Session Expiry Interval 属性实现。比较常见的一个应用场景是,客户端在上线时将会话过期时间设置为一个大于 0 的值,避免网络中断影响正常业务。然后在客户端完成所有业务主动下线时,将会话过期时间更新为 0,这样服务端也可以及时地释放会话。

会话与Client ID

服务端使用 Client ID 来唯一地标识每个会话,如果客户端想要在连接时复用之前的会话,那么必须使用与此前一致的 Client ID。所以当我们使用服务端自动分配 Client ID 的功能时,客户端必须将 CONNACK 报文中返回的 Assigned Client Identifier 保存下来以供下次使用。

注意,MQTT 5.0 之前的协议版本并不支持服务端返回自动分配的 Client ID。

MQTT 3.1.1 中的 Clean Session

MQTT 3.1.1 中的会话机制,在灵活性上远不如 5.0。因为 3.1.1 只有一个 Clean Session 字段,且它只有 0 和 1 两个可取值,两者关系如下图:

image-20240928102007216

MQTT 3.1.1 中,会话的生命周期只有两种选项:永不过期或与网络连接保持一致。另外,是否需要创建全新的会话,也与会话的生命周期强行绑定在了一起,在 3.1.1 中,我们必须指定 Clean Session 为 1 和 0 各连接一次,才能让服务端创建一个全新的、持久的会话。MQTT 5.0 在会话方面的改进是巨大的。

会话持久性建议

在 MQTT 中,我们通常将生命周期长于网络连接的会话设置成持久会话。和Redis做缓存功能一样,我们的服务器资源是有限的,这一类工具本身并不是为大数据量设计的,特别是在会话特别多的使用场景,我们一定要及时清理不活跃的会话。

MQTT协议解析

MQTT协议报文分为三部分:固定报文、可变报文、有效载荷

  1. 固定报文:在所有MQTT数据包中,表示数据包类型及数据包的分组类标识。固定报文的长度并非固定不变,而是相对于可变报文来说,固定报文在不同类型的消息中,都固定有一个字节消息首部和一至四个字节的用于表示剩余长度的部分。总长2到5个字节。
  2. 可变报文:在部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。在连接Broker时包括协议名称、协议版本、连接标记、心跳间隔时长等内容;在发布消息时包括主题名称、消息ID 报文信息;在连接确认时含有连接返回码报文。其他类型消息没有报文内容。
  3. 有效载荷:在部分MQTT数据包中,表示客户端收到的具体内容。

整体 MQTT 的消息格式如下图所示:

img

固定报头

固定报头由报文类型、标识位和报文剩余长度三个字段组成。第一部分1个字节,从高位到低位分别是消息类型(占 4bit)、重传标识(占 1 bit)、Qos 标识(占 2bit)、保留位 (占1bit);第二部分1到4字节,采用Varint编码。

image-20240928211459059

控制报文类型16种:

image-20240928204002694

image-20240928212203207

重传标识

第三位为重传标识(DUP),如果值为 1 表示这个数据包是一条重复的消息,否则该数据包就是第一次发布。

Qos发布消息服务质量

服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时,连接已经在一定程度上受到保护。但是在无线网络中,中断和干扰很频繁,MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器,则客户端将是发送者,MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时,服务器是发送者,客户端是接收者。

第 2 至 1 位表示发布消息的服务质量,它有 四个值分别是:

  • 00:Qos0 最多一次,即:<=1
  • 01:Qos1 至少一次,即:>=1
  • 10:Qos2 刚好一次,即:=1
  • 11:预留

可变报头

可变报头(Variable Header)位于固定报头之后、有效载荷之前。可变报文的内容和格式根据具体的报文类型而变化,也就是说,不同的MQTT控制报文可能包含不同的可变报文字段或不包含可变报文。

可变报头按下列次序包含四个字段:

  1. 协议名(Protocol Name)。前两个字节为协议名称长度,后面的字节为协议名称。6个字节。
  2. 协议级别(ProtocolLevel)。1个字节。
  3. 连接标志(Connect Flags)。1个字节。
  4. 保持连接(Keep Alive)。2个字节。

image-20240928213809602

其中连接标识8个bit位细化展示如下:

image-20240928213550248

有效载荷

最后是有效载荷部分。我们可以将报文的可变报头看作是它的附加项,而有效载荷则用于实现这个报文的核心目的。

参考阅读:

https://blog.csdn.net/aaaaannnnnn_/article/details/135541092

https://zhuanlan.zhihu.com/p/639072664

MQTT示例

在一台已经装有mosquitto的Linux系统上做下列实验

1
2
3
4
5
6
# 以QoS=2订阅topic1的消息,禁止清除会话,意味着消息持久化和断线重连的时候发送历史消息
mosquitto_sub -v -c -i "clientid1" -t "topic1" -q 2 -u chende -P xxx
mosquitto_sub -v -c -i "clientid2" -t "topic1" -q 2 -u chende -P xxx

# 向topic1发布消息,重复发多次。关闭订阅,多发几次,再打开订阅看效果。
mosquitto_pub -u chende -P xxx -t "topic1" -q 2 -m "Hello World 123"

(完)