源码结构
源码仅包含 mqtt_client_tiny.c 和 mqtt_client_tiny.h 两个文件,可直接集成到项目中。example.c 为测试案例,不需要可移除。
核心模块与移植接口
日志调试
框架提供了三个日志宏定义,用于不同级别的信息输出。无需打印功能时,注释掉宏定义中的 printf 语句即可,或将其替换为目标平台的日志接口。
#define mqtt_log_d(...) //printf(__VA_ARGS__);
#define mqtt_log_i(...) printf("[mqtt](%d) ", __LINE__); printf(__VA_ARGS__);
#define mqtt_log_e(...) printf("[mqtt-err](%d)", __LINE__); printf(__VA_ARGS__);
内存管理
内存申请与释放接口通过宏定义实现,用户需根据自身平台的内存管理函数进行替换。
#define PLATFORM_MALLOC(size) malloc(size)
#define PLATFORM_FREE(size) free(size)
互斥锁
互斥锁接口用于保证多线程环境下的数据安全,主要控制发送缓冲区及Socket发送操作的互斥访问。需要实现初始化、销毁、上锁和解锁四个基本操作。
static int platform_mutex_init(platform_mutex_t* m){
return pthread_mutex_init(&(m->mutex), NULL);
}
static int platform_mutex_destroy(platform_mutex_t* m){
return pthread_mutex_destroy(&(m->mutex));
}
static int platform_mutex_lock(platform_mutex_t* m){
return pthread_mutex_lock(&(m->mutex));
}
static int platform_mutex_unlock(platform_mutex_t* m){
return pthread_mutex_unlock(&(m->mutex));
}
platform_mutex_destroy() 用于客户端销毁时释放资源,若无此需求可不实现。
线程管理
框架需要一个独立的后台管理线程来处理网络连接、订阅、重连、心跳及数据接收等任务。platform_thread_init() 用于创建此线程,platform_thread_cancel() 用于销毁线程。这两个接口的形式和内容均可按目标平台的线程API进行修改。
static int platform_thread_init(void *thread, void (*entry)(void *), void *arg){
return pthread_create((pthread_t *)thread, NULL, (void *(*)(void*))entry, arg);
}
static int platform_thread_cancel(platform_pthread_t thread){
pthread_cancel((pthread_t)thread);
}
TCP网络通信
稳定可靠的网络通信是MQTT连接的基础,涉及到网络编程的核心概念如非阻塞连接、超时处理和错误恢复。以下实现采用非阻塞连接结合select进行超时控制,关键在于准确区分网络故障与操作超时,这是实现稳定断网重连机制的前提。
static void mytcp_disconnect(int socket_fd){
int ret;
if(socket_fd < 0) return; //避免关闭无效描述符
do {
shutdown(socket_fd, SHUT_RDWR);
ret = close(socket_fd);
}while(ret == -1 && errno == EINTR); //处理信号中断
}
static int mytcp_connect(const char *host, uint16_t port, int timeout){
// ... (详细连接实现代码)
}
static int mytcp_write(int socket_fd, uint8_t *buf, int len, int timeout){
// ... (详细发送实现代码)
}
static int mytcp_read(int socket_fd, uint8_t *buf, int len, int timeout){
// ... (详细接收实现代码)
}
注意:网络接口的健壮性直接影响系统稳定性,实现时需充分考虑服务器故障、网络延迟、异常断开等各种边界情况,确保连接具备自恢复能力。
定时器
提供简单的定时功能,用于心跳包发送与接收超时判断。接口对精度要求不高,易于移植。
typedef struct{
struct timeval time;
}platform_timer_t;
static void platform_timer_usleep(uint32_t usec){
usleep(usec);
}
static void platform_timer_init(platform_timer_t* timer){
timer->time = (struct timeval){0, 0};
}
static char platform_timer_is_expired(platform_timer_t* timer){
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->time, &now, &res);
return ((res.tv_sec < 0) || (res.tv_sec == 0 && res.tv_usec <= 0));
}
static void platform_timer_cutdown(platform_timer_t* timer, uint32_t timeout){
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
timeradd(&now, &interval, &timer->time);
}
链表
链表用于管理待订阅的Topic及其对应的消息处理回调函数。这些接口为通用实现,通常无需修改。
//链表实现相关宏与函数
#define LIST_ENTRY(list, type, field) \
((type *)((unsigned char *)(list) - ((size_t)&(((type *)0)->field))))
#define LIST_FOR_EACH_SAFE(curr, next, list) \
for (curr = (list)->next, next = curr->next; curr != (list); curr = next, next = curr->next)
// ... (链表初始化、添加、删除等函数实现)
客户端核心API
初始化与销毁
MqttClient_t *mqtt_init(const char* clientid,
const char* username, const char* password,
char* host, uint16_t port,
uint32_t rbuflen, uint32_t wbuflen);
int mqtt_destroy(MqttClient_t* c);
初始化时需确定所有参数(客户端ID、认证信息、服务器地址、缓冲区大小),以简化后续逻辑。mqtt_destroy() 在无需动态销毁客户端的场景下可不实现。
订阅Topic设置
int mqtt_set_topic(MqttClient_t *c, const char *topic, msg_handler_t handler);
将Topic及其消息处理回调函数添加到链表中。当前实现仅支持QoS 0。
连接控制
int mqtt_set_connect(MqttClient_t* c, bool start);
通过设置标志位,通知后台管理线程启动或断开连接。
数据发布
int mqtt_publish(MqttClient_t* c, const char *topic, uint8_t *data, uint32_t datalen);
用于发送应用数据。发送前会校验数据长度及缓冲区容量,超限则直接返回失败,确保数据的原子性(全部发送或失败)。
后台管理线程
后台线程是一个状态机,负责维护客户端的整个生命周期(连接、订阅、就绪、断开等状态)。
static void mqtt_thread_main(void* arg){
...
//线程分离,分离后主进程退出,它也会退出
pthread_detach(pthread_self());
while(1){
switch(c->client_state){
case(CS_CONNECTING):{ ... break; }
case(CS_SUBSCRIBING):{ ... break; }
case(CS_RREADY):{ ... break; }
case(CS_DISCONNECTING):{ ... } // 注意:此处无break,将连贯执行CS_CLOSESOCKET
case(CS_CLOSESOCKET):{ ... break; }
case(CS_IDLE):
default:{
if(c->connect_start == true){
c->client_state = CS_CONNECTING;
continue;
}
platform_timer_usleep(500*1000); // 空闲状态延时,避免空转消耗CPU
}
}
}
}
实现要点:
pthread_detach 用于自动回收线程资源,这一设计主要是考虑到线程异常退出时的资源释放问题。在其他RTOS平台上若无此接口可移除,并在 mqtt_destroy() 中实现资源回收。
CS_DISCONNECTING 状态后未使用 break,使其能连贯执行 CS_CLOSESOCKET 状态的操作。
- 空闲状态下的
platform_timer_usleep 延时值需权衡:过长影响连接控制的响应速度,过短则增加CPU负载。
测试验证
使用 NanoMQ 作为 MQTT 服务器进行测试。编译运行 example.c 后,可观察到正常的消息收发。

关于长期运行的稳定性,将在后续使用中持续验证。如在使用中发现任何问题或有改进建议,欢迎反馈。