第十章 MQTT
10.1 MQTT的历史背景¶
- 起源
MQTT(Message Queuing Telemetry Transport)由 IBM 的 Andy Stanford-Clark 和 Arcom 公司的 Arlen Nipper 于 1999 年 开发,最初用于石油管道的远程监控系统,旨在解决卫星通信中的高延迟和低带宽问题。
- 标准化进程
2013 年 :IBM 将 MQTT 3.1 提交至 OASIS(结构化信息标准促进组织),推动其成为开放标准。
2016 年 :MQTT 3.1.1 成为 OASIS 官方标准,并被广泛采用。
2019 年 :MQTT 5.0 发布,新增会话管理、原因码、共享订阅等功能,进一步提升协议能力。
ISO 标准 :MQTT 被纳入 ISO/IEC 20922 标准,成为国际认可的物联网通信协议。
MQTT从工业监控逐步扩展至智能家居、车联网、移动应用等领域,成为物联网(IoT)的核心协议。
10.2 MQTT的技术特性¶
-
轻量级设计
- 协议头最小仅 2 字节,适合资源受限的嵌入式设备和低带宽网络。
- 代码实现简单,占用内存少(如 Paho 客户端库仅需几十 KB)。
-
发布/订阅模型
- 解耦通信 :发布者与订阅者无需直接交互,通过 Broker 中转消息,提升系统扩展性。
- 异步通信 :支持海量设备同时连接,适应高并发场景。
-
灵活的服务质量(QoS)
QoS 等级 描述 适用场景 0 最多一次(可能丢失) 实时性高、允许丢包(如传感器数据) 1 至少一次(可能重复) 需可靠传输但允许冗余(如控制指令) 2 恰好一次(严格可靠) 金融交易、关键操作 -
网络适应性
- 支持断线重连和会话保持,确保弱网环境下的通信连续性。
- 通过心跳机制(
PINGREQ/PINGRESP
)检测连接状态。
-
安全机制
- 支持 TLS/SSL 加密传输,防止数据窃听。
- 提供用户名/密码认证和客户端证书鉴权。
10.3 广义发布订阅模型¶
10.3.1 发布订阅模型¶
三个重要的专业名称:
- Broker(代理服务器) :消息路由中心,负责接收、过滤和转发消息(如 Mosquitto、EMQX)。
- Publisher(发布者) :向指定主题(Topic)发送消息的设备或应用。
- **Subscriber(订阅者) ** :订阅主题以接收相关消息的终端。
10.3.2 发布订阅模式优势¶
-
系统解耦
- 空间解耦:发布者无需知道订阅者的IP/端口;订阅者无需感知发布者的存在。
- 时间解耦:发布/订阅双方无需同时在线;支持离线消息存储。
- 协议解耦:发布者与订阅者可使用不同编程语言。
-
主题过滤
- 单层匹配符:如
home/+/temperature
匹配home/livingroom/temperature
√,但不匹配home/floor1/room2/temperature
×。 - 多层匹配符:如
factory/#
匹配factory/machine1
√,同样匹配factory/buildingA/machine2/status
√。
- 单层匹配符:如
10.4 MQTT发布订阅模型¶
10.4.1 MQTT发布订阅模型¶
相比于广义发布订阅模型,MQTT还有一个后端设备,它是一个数据存储服务器,他们都订阅同一个 Address 主题。
10.4.2 MQTT基本概念¶
- MQTT客户端 :指连接到MQTT代理的所以设备,该设备发布消息就是发布者,订阅消息便是订阅者,也可以同时是订阅者和发布者。
- MQTT代理(Broker) :是一个消息中转服务器,负责接收、过滤和转发消息。
- MQTT连接 :订阅者和发布者通过TCP/IP协议与代理服务器建立连接。
- MQTT主题 :消息的路由标识,采用分层结构,用
/
分隔层级。 - 订阅 :客户端向Broker发送
SUBSCRIBE
报文,声明关注的主题。 - 发布 :客户端向Broker发送
PUBLISH
报文,消息包含主题和负载(Payload)。 - 取消订阅 :客户端发送
UNSUBSCRIBE
报文,移除对指定主题的订阅。 - 服务质量 :
- QoS 0:最多一次(可能丢失)。
- QoS 1:至少一次(可能重复)。
- QoS 2:恰好一次(可靠但开销大)。
10.5 Paho-MQTT¶
10.5.1 Paho-MQTT简介¶
Paho-MQTT 是 Eclipse 基金会维护的开源 MQTT 协议实现库,提供轻量级、跨平台的 MQTT 客户端支持,专为物联网和分布式系统设计。
特性 | 描述 |
---|---|
协议支持 | MQTT 3.1.1 / 5.0,支持 QoS 0-2 等级 |
跨平台 | 支持 Linux/Windows/嵌入式系统(FreeRTOS、OpenHarmony 等) |
多语言绑定 | 提供 C/C++、Python、Java、JavaScript 等实现 |
轻量化设计 | C 库体积 < 100KB(裁剪后),内存占用 < 20KB |
安全扩展 | TLS/SSL 加密支持(需搭配 OpenSSL/mbedTLS) |
断线管理 | 自动重连、遗嘱消息(LWT)、会话保持 |
GitHub地址:paho.mqtt.embedded-c
下载源码,放置在applications/sample/wifi-iot/app
目录:
同时建议将文件夹名称改为paho_mqtt
,方便后续操作。
10.5.2 源码配置¶
MQTTClient
是基于C++的MQTT库,而MQTTClient-C
则是基于C语言,故我们仅会使用到后者。
打开文件夹,我们会发现在src
目录中有一些热门操作系统适配的MQTT库,它们均依赖于MQTTPacket
库。不过很遗憾,并没有Open Harmony的适配,因此我们需要手动移植一些文件。
移植完成的paho-mqtt源码(来自书籍资料):
Danger
ohos适配部分由江苏润开鸿数字科技有限公司完成,其是润和软件投资的全资子公司,而润和软件与华为同为开源鸿蒙的A类捐赠者,并基于Open Harmony开发了HiHopeOS 操作系统。
src
添加了一个ohos适配目录。
samples
目录下添加了一些ohos示例程序,包括与AT命令相关的API文件,来自OpenHarmony1.0版本。
在vscode的c_cpp_properties.json
文件添加如下includePath
内容:
"${workspaceFolder}/applications/sample/wifi-iot/app/paho_mqtt/MQTTPacket/src",
"${workspaceFolder}/applications/sample/wifi-iot/app/paho_mqtt/MQTTClient-C/src",
"${workspaceFolder}/applications/sample/wifi-iot/app/paho_mqtt/MQTTClient-C/src/ohos"
APP编译脚本
import("//build/lite/config/component/lite_component.gni")
lite_component("app") {
features = [
"gn_practice/application:gn_app",
"gn_practice/driver:gn_driver",
"gn_practice/library:gn_library",
"kv_store_demo:kv_store_demo",
"file_demo:file_demo",
"thread_demo:thread_demo",
"timer_demo:timer_demo",
"mutex_demo:mutex_demo",
# GPIO模块
"gpio_demo:gpio_demo",
# ADC模块
"adc_demo:adc_demo",
# OLED模块
"oled_demo:oled_demo",
# WiFi模块
"wifi_demo:wifi_demo",
# MQTT模块
"paho_mqtt/MQTTClient-C:paho-mqttclient",
"paho_mqtt/MQTTPacket:paho-mqttpacket",
]
}
编译烧录测试移植没问题。
10.5.3 AT命令测试¶
我们编译的ohos示例程序是对Open Harmony的AT命令的拓展。在mqtt_test_cmsis.c
中的宏定义定义了增加的一些AT命令。
学习AT指令
在MobaXterm中输入AT+MQTT_CONN=
,回车后使用 Ctrl+J 组合键,会得到命令的使用方法:
10.5.4 EasyWiFI配置¶
将EasyWiFi模块配置到MQTT中,让WiFI连接更简单。
在vscode的c_cpp_properties.json
文件添加如下includePath
内容:
APP编译脚本
import("//build/lite/config/component/lite_component.gni")
lite_component("app") {
features = [
"gn_practice/application:gn_app",
"gn_practice/driver:gn_driver",
"gn_practice/library:gn_library",
"kv_store_demo:kv_store_demo",
"file_demo:file_demo",
"thread_demo:thread_demo",
"timer_demo:timer_demo",
"mutex_demo:mutex_demo",
# GPIO模块
"gpio_demo:gpio_demo",
# ADC模块
"adc_demo:adc_demo",
# OLED模块
"oled_demo:oled_demo",
# WiFi模块
"wifi_demo:wifi_demo",
# MQTT模块
"paho_mqtt/MQTTClient-C:paho-mqttclient",
"paho_mqtt/MQTTPacket:paho-mqttpacket",
# EasyWiFi模块
"easy_wifi/src:easy_wifi"
]
}
编译以后可能会报一个错误,根本原因是OpenHarmony版本不同,目录有所变化,我的版本为 3.0.7 。
在//applications/sample/wifi-iot/app/easy_wifi/src
目录下找到编译脚本,如下图的位置,修改wifi接口的·目录为//foundation/communication/wifi_lite/interfaces/wifiservice
,然后再次编译便没有问题。
10.6 MQTT连接华为云¶
发布信息格式
{
"services": [{
"service_id": "Upload",
"properties": {
"temperature": 25,
"humidity": 60,
"MQ2": 1
}
}
]
}
请求设备影子格式
mqtt_task.c
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "cmsis_os2.h"
// MQTTClient-C库接口文件
#include "MQTTClient.h"
// OHOS(LiteOS)适配接口文件
#include "mqtt_ohos.h"
// 自定义的接口文件
#include "mqtt_task.h"
// 定义一个宏,用于输出日志
#define LOGI(fmt, ...) printf(fmt "\n", ##__VA_ARGS__)
// MQTT客户端(MQTTClient.h)
static MQTTClient client = {0};
// MQTT网络连接(mqtt_ohos.h)
static Network network = {0};
// 接收和发送数据的缓冲区
static unsigned char sendbuf[512], readbuf[512];
// 标识任务循环运行与否
static int running = 1;
/**
* @brief MQTT任务循环
*
* @param arg MQTT客户端(MQTTClient)
*/
static void MqttTask(void* arg)
{
/* 输出开始日志 */
LOGI("[INFO] MqttTask start!");
/* 获取任务参数 */
MQTTClient* pClient = (MQTTClient*)arg;
/* 任务循环 */
while (pClient)
{
// paho_mqtt对互斥锁操作进行了一个简单的封装
// 当宏 MQTT_TASK 被定义后,MQTTClient结构体会多两个成员 mutex 和 thread
/* 获取互斥锁(mqtt_ohos_cmsis.c) */
mqttMutexLock(&pClient->mutex);
if (!running) {
// 退出任务循环
LOGI("[ERROR] MQTT background thread exit!");
/* 释放互斥锁 */
mqttMutexUnlock(&pClient->mutex);
break;
}
/* 释放互斥锁(mqtt_ohos_cmsis.c) */
mqttMutexUnlock(&pClient->mutex);
/* ---------------------------------------- */
/* 获取互斥锁 */
mqttMutexLock(&pClient->mutex);
// 客户端连接成功
if (pClient->isconnected) {
// 维持 MQTT 客户端的后台通信(MQTTClient.h)
MQTTYield(pClient, 100);
}
/* 释放互斥锁 */
mqttMutexUnlock(&pClient->mutex);
/* ---------------------------------------- */
// 等待 1 s(mqtt_ohos_cmsis.c)
Sleep(1000);
}
// 输出日志
LOGI("[ERROR] MqttTask exit!");
}
/**
* @brief 初始化 MqttTask,基于 MqttTask 创建一个线程
*
*/
void MqttTaskInit(void)
{
/* 初始化并启动MQTT客户端 */
// 网络初始化(mqtt_ohos_cmsis.c)
NetworkInit(&network);
// 客户端初始化(MQTTClient.h)
MQTTClientInit(&client, &network, 100, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
/* 创建MQTT线程 */
// paho_mqtt对创建线程操作进行了一个简单的封装
ThreadStart(&client.thread, MqttTask, &client);
LOGI("[INFO] MqttTaskInit done!");
}
/**
* @brief 停止MQTT任务
*
*/
void MqttTaskDeinit(void)
{
// 获取互斥锁
mqttMutexLock(&client.mutex);
// 标识变量归零
running = 0;
// 释放互斥锁
mqttMutexUnlock(&client.mutex);
// 删除互斥锁
mqttMutexDeinit(&client.mutex);
}
/**
* @brief 连接MQTT服务器(Broker)
*
* @param host 服务器地址
* @param port 服务器端口
* @param clientId 客户端ID
* @param username 用户名
* @param password 密码
* @return 0:成功,-1:失败
*/
int MqttTaskConnect(const char *host, unsigned short port,
const char *clientId, const char *username, const char *password)
{
/* 接收返回值变量 */
int rc = 0;
/* 初始化MQTT连接信息(MQTTPacket\MQTTConnect.h) */
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
/* 使用TCP socket连接MQTT服务器 */
rc = NetworkConnect(&network, (char*)host, port);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] NetworkConnect is %d", rc);
return -1;
}
/* 设置MQTT连接信息 */
if (username != NULL && password != NULL) {
connectData.username.cstring = (char*)username;
connectData.password.cstring = (char*)password;
// MQTT版本,3 = 3.1,4 = 3.1.1
connectData.MQTTVersion = 3;
connectData.clientID.cstring = (char*)clientId;
}
/* 发送MQTT连接包(MQTTClient.h) */
rc = MQTTConnect(&client, &connectData);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] MQTTConnect failed: %d", rc);
return -1;
}
/* 连接成功 */
LOGI("[INFO] MQTT Connected!");
return 0;
}
/* 主题订阅回调函数 */
static void MessageHander(MessageData* data)
{
// (MQTTClient.h)
printf("[INFO] Message Size:%d\r\n", (int)data->message->payloadlen);
LOGI("[INFO] Message arrived on topic:\r\nTopic: %.*s\r\n",
(int)data->topicName->lenstring.len, (char *)data->topicName->lenstring.data);
LOGI("[INFO] payload: %.*s\r\n", (int)data->message->payloadlen, (char *)data->message->payload);
}
/**
* @brief 订阅主题
*
* @param topic 主题
* @return 0:成功,-1:失败
*/
int MqttTaskSubscribe(char* topic)
{
/* 接收返回值变量 */
int rc = 0;
/* 输出日志 */
LOGI("[INFO] Subscribe: [%s] from broker", topic);
/* 发送订阅包 */
rc = MQTTSubscribe(&client, topic, QOS0, MessageHander);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] MQTTSubscribe failed: %d", rc);
return -1;
}
/* 订阅成功 */
return 0;
}
/**
* @brief 取消主题订阅
*
* @param topic 主题
* @return 0:成功,-1:失败
*/
int MqttTaskUnSubscribe(char* topic)
{
/* 接收返回值变量 */
int rc = 0;
/* 输出日志 */
LOGI("[INFO] UnSubscribe: [%s] from broker", topic);
/* 发送订阅包 */
rc = MQTTUnsubscribe(&client, topic);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] MQTTUnsubscribe failed: %d", rc);
return -1;
}
/* 取消订阅成功 */
return 0;
}
/**
* @brief 向指定主题发布消息
*
* @param topic 主题
* @param payload
* @return 0:成功,-1:失败
*/
int MqttTaskPublish(char *topic, char *payload)
{
/* 接收返回值变量 */
int rc = 0;
/* 定义MQTT消息数据包(MQTTClient.h) */
MQTTMessage message = {
.qos = QOS0,
.retained = 0,
.payload = payload,
.payloadlen = strlen(payload),
};
LOGI("[INFO] Publish: #'%s': '%s' to broker", topic, payload);
/* 发布消息 */
rc = MQTTPublish(&client, topic, &message);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] MQTTPublish failed: %d", rc);
return -1;
}
/* 发布成功 */
return 0;
}
/**
* @brief 断开与MQTT服务器的连接
*
* @return 0:成功,-1:失败
*/
int MqttTaskDisconnect(void)
{
/* 接收返回值变量 */
int rc = 0;
/* 发送断开连接数据包 */
rc = MQTTDisconnect(&client);
if (rc != 0) {
// 连接失败,输出日志并返回-1
LOGI("[ERROR] MQTTDisconnect failed: %d", rc);
return -1;
}
/* 断开和MQTT服务器的TCP socket连接 */
NetworkDisconnect(&network);
/* 断开连接成功 */
return 0;
}
mqtt_task.h
#ifndef MQTT_TASK_H
#define MQTT_TASK_H
void MqttTaskInit(void);
void MqttTaskDeinit(void);
int MqttTaskConnect(const char *host, unsigned short port,
const char *clientId, const char *username, const char *password);
int MqttTaskSubscribe(char* topic);
int MqttTaskUnSubscribe(char* topic);
int MqttTaskPublish(char *topic, char *payload);
int MqttTaskDisconnect(void);
#endif
mqtt_demo.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
// STA模式头文件
#include "wifi_connecter.h"
#include "mqtt_task.h"
#include "cJSON.h"
/**
* mqtt_demo/mqtt_demo.c
* MQTT示例
*/
/* WiFi信息 */
// 账号
#define HOTSPOT_SSID "OpenHarmony"
// 密码
#define HOTSPOT_PASSWD "123456789"
// 加密方式
#define HOTSPOT_TYPE WIFI_SEC_TYPE_PSK
/* 华为云信息 */
// MQTT连接参数
#define MQTT_HOST "9dfa5c258d.st1.iotda-device.cn-east-3.myhuaweicloud.com"
#define MQTT_PORT 1883
#define MQTT_CLIENT_ID "xxx"
#define MQTT_USERNAME "xxx"
#define MQTT_PASSWD "xxx"
#define MQTT_DEVICE_ID "677388cfbab900244b135588_DATAS"
#define MQTT_SERVICE_ID "Upload"
// HUAWEICLOUDE平台的话题定义
#define MQTT_PublishTopic "$oc/devices/677388cfbab900244b135588_DATAS/sys/properties/report"
#define MQTT_RequestTopic "$oc/devices/677388cfbab900244b135588_DATAS/sys/shadow/get/request_id={request_id}"
#define MQTT_SubscribeTopic "$oc/devices/677388cfbab900244b135588_DATAS/sys/shadow/get/response/#"
/**
* @brief 发布消息,上传温湿度、MQ2数据
*
* @param temp 温度
* @param humidity 湿度
* @param mq2 MQ2是否超阈值
* @return 形成的JSON发送格式
*/
static char* MqttPublishPayload(float temp, float humidity, int mq2) {
// 创建JSON结构
cJSON *root = cJSON_CreateObject();
cJSON *services = cJSON_CreateArray();
// 构建服务节点
cJSON *upload = cJSON_CreateObject();
cJSON_AddStringToObject(upload, "service_id", "Upload");
// 添加属性
cJSON *props = cJSON_CreateObject();
cJSON_AddNumberToObject(props, "temperature", temp);
cJSON_AddNumberToObject(props, "humidity", humidity);
cJSON_AddNumberToObject(props, "MQ2", mq2);
cJSON_AddItemToObject(upload, "properties", props);
// 组合结构
cJSON_AddItemToArray(services, upload);
cJSON_AddItemToObject(root, "services", services);
// 生成字符串
char *payload = cJSON_PrintUnformatted(root);
// 释放cJSON树(注意:不释放payload字符串)
cJSON_Delete(root);
return payload;
}
/**
* @brief
*
* @param device_id
* @param service_id
* @return char*
*/
static char* MqttRequestPayload(const char* device_id, const char* service_id) {
// 参数有效性检查
if (!device_id || !service_id || strlen(device_id) == 0 || strlen(service_id) == 0) {
return NULL;
}
// 创建JSON根对象
cJSON *root = cJSON_CreateObject();
if (!root) return NULL;
// 添加设备ID字段
cJSON_AddStringToObject(root, "object_device_id", device_id);
// 添加服务ID字段
cJSON_AddStringToObject(root, "service_id", service_id);
// 生成紧凑型JSON
char *payload = cJSON_PrintUnformatted(root);
// 清理cJSON结构
cJSON_Delete(root);
return payload;
}
static void MqttTaskDemo(void* arg)
{
/* 初始化WIFI参数 */
WifiDeviceConfig apConfig = {
// 热点名称
.ssid = HOTSPOT_SSID,
// 热点密码
.preSharedKey = HOTSPOT_PASSWD,
// 加密方式(PSK)
.securityType = HOTSPOT_TYPE,
};
/* 连接WIFI */
int netId = ConnectToHotspot(&apConfig);
if (netId < 0) {
printf("[ERROR] Connect to AP failed!\r\n");
return;
}
/* 初始化并启动MQTT任务,连接MQTT服务器 */
MqttTaskInit();
if (MqttTaskConnect(MQTT_HOST, MQTT_PORT, MQTT_CLIENT_ID, MQTT_USERNAME, MQTT_PASSWD) != 0) {
// 连接失败,输出错误信息并退出
printf("[ERROR] Connect to MQTT server failed!\r\n");
return;
}
/* 订阅主题 */
int rc = MqttTaskSubscribe(MQTT_SubscribeTopic);
if (rc != 0) {
// 订阅失败,输出错误信息并退出
printf("[ERROR] MQTT Subscribe failed!\r\n");
return;
} else {
// 输出订阅成功信息
printf("[INFO] MQTT Subscribe OK\r\n");
}
/* 发布请求信息 */
char* payload = MqttRequestPayload(MQTT_DEVICE_ID, MQTT_SERVICE_ID);
rc = MqttTaskPublish(MQTT_RequestTopic, payload);
if (rc != 0) {
// 发布失败,输出错误信息
printf("[ERROR] MQTT Request failed!\r\n");
return;
} else {
// 发布成功,输出成功信息
printf("[INFO] MQTT Request OK\r\n");
}
/* 发布消息 */
payload = MqttPublishPayload(17, 80, 0);
rc = MqttTaskPublish(MQTT_PublishTopic, payload);
if (rc != 0) {
// 发布失败,输出错误信息
printf("[ERROR] MQTT Publish failed!\r\n");
return;
} else {
// 发布成功,输出成功信息
printf("[INFO] MQTT Publish OK\r\n");
}
}
/* 入口函数 */
static void MqttEntry(void)
{
osThreadAttr_t attr = {
.name = "MqttTaskDemo",
.stack_size = 10240,
.priority = osPriorityNormal
};
if (osThreadNew(MqttTaskDemo, NULL, &attr) == NULL) {
printf("[ERROR] Thread Create Faild.\r\n");
}
}
SYS_RUN(MqttEntry);