找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

656

积分

0

好友

89

主题
发表于 前天 04:10 | 查看: 12| 回复: 0

源码结构

源码仅包含 mqtt_client_tiny.cmqtt_client_tiny.h 两个文件,可直接集成到项目中。example.c 为测试案例,不需要可移除。

核心模块与移植接口

日志调试

框架提供了三个日志宏定义,用于不同级别的信息输出。无需打印功能时,注释掉宏定义中的 printf 语句即可,或将其替换为目标平台的日志接口。

#define mqtt_log_d(...)      //printf(__VA_ARGS__);
#define mqtt_log_i(...)      printf("[mqtt](%d) ", __LINE__); printf(__VA_ARGS__);
#define mqtt_log_e(...)      printf("[mqtt-err](%d)", __LINE__); printf(__VA_ARGS__);

内存管理

内存申请与释放接口通过宏定义实现,用户需根据自身平台的内存管理函数进行替换。

#define PLATFORM_MALLOC(size)        malloc(size)
#define PLATFORM_FREE(size)          free(size)

互斥锁

互斥锁接口用于保证多线程环境下的数据安全,主要控制发送缓冲区及Socket发送操作的互斥访问。需要实现初始化、销毁、上锁和解锁四个基本操作。

static int platform_mutex_init(platform_mutex_t* m){
    return pthread_mutex_init(&(m->mutex), NULL);
}
static int platform_mutex_destroy(platform_mutex_t* m){
    return pthread_mutex_destroy(&(m->mutex));
}
static int platform_mutex_lock(platform_mutex_t* m){
    return pthread_mutex_lock(&(m->mutex));
}
static int platform_mutex_unlock(platform_mutex_t* m){
    return pthread_mutex_unlock(&(m->mutex));
}

platform_mutex_destroy() 用于客户端销毁时释放资源,若无此需求可不实现。

线程管理

框架需要一个独立的后台管理线程来处理网络连接、订阅、重连、心跳及数据接收等任务。platform_thread_init() 用于创建此线程,platform_thread_cancel() 用于销毁线程。这两个接口的形式和内容均可按目标平台的线程API进行修改。

static int platform_thread_init(void *thread, void (*entry)(void *), void *arg){
    return pthread_create((pthread_t *)thread, NULL, (void *(*)(void*))entry, arg);
}
static int platform_thread_cancel(platform_pthread_t thread){
    pthread_cancel((pthread_t)thread);
}

TCP网络通信

稳定可靠的网络通信是MQTT连接的基础,涉及到网络编程的核心概念如非阻塞连接、超时处理和错误恢复。以下实现采用非阻塞连接结合select进行超时控制,关键在于准确区分网络故障与操作超时,这是实现稳定断网重连机制的前提。

static void mytcp_disconnect(int socket_fd){
    int ret;
    if(socket_fd < 0) return; //避免关闭无效描述符
    do {
        shutdown(socket_fd, SHUT_RDWR);
        ret = close(socket_fd);
    }while(ret == -1 && errno == EINTR); //处理信号中断
}
static int mytcp_connect(const char *host, uint16_t port, int timeout){
    // ... (详细连接实现代码)
}
static int mytcp_write(int socket_fd, uint8_t *buf, int len, int timeout){
    // ... (详细发送实现代码)
}
static int mytcp_read(int socket_fd, uint8_t *buf, int len, int timeout){
    // ... (详细接收实现代码)
}

注意:网络接口的健壮性直接影响系统稳定性,实现时需充分考虑服务器故障、网络延迟、异常断开等各种边界情况,确保连接具备自恢复能力。

定时器

提供简单的定时功能,用于心跳包发送与接收超时判断。接口对精度要求不高,易于移植。

typedef struct{
    struct timeval time;
}platform_timer_t;
static void platform_timer_usleep(uint32_t usec){
    usleep(usec);
}
static void platform_timer_init(platform_timer_t* timer){
    timer->time = (struct timeval){0, 0};
}
static char platform_timer_is_expired(platform_timer_t* timer){
    struct timeval now, res;
    gettimeofday(&now, NULL);
    timersub(&timer->time, &now, &res);
    return ((res.tv_sec < 0) || (res.tv_sec == 0 && res.tv_usec <= 0));
}
static void platform_timer_cutdown(platform_timer_t* timer, uint32_t timeout){
    struct timeval now;
    gettimeofday(&now, NULL);
    struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
    timeradd(&now, &interval, &timer->time);
}

链表

链表用于管理待订阅的Topic及其对应的消息处理回调函数。这些接口为通用实现,通常无需修改。

//链表实现相关宏与函数
#define LIST_ENTRY(list, type, field) \
    ((type *)((unsigned char *)(list) - ((size_t)&(((type *)0)->field))))
#define LIST_FOR_EACH_SAFE(curr, next, list) \
    for (curr = (list)->next, next = curr->next; curr != (list); curr = next, next = curr->next)
// ... (链表初始化、添加、删除等函数实现)

客户端核心API

初始化与销毁

MqttClient_t *mqtt_init(const char* clientid,
                        const char* username, const char* password,
                        char* host, uint16_t port,
                        uint32_t rbuflen, uint32_t wbuflen);
int mqtt_destroy(MqttClient_t* c);

初始化时需确定所有参数(客户端ID、认证信息、服务器地址、缓冲区大小),以简化后续逻辑。mqtt_destroy() 在无需动态销毁客户端的场景下可不实现。

订阅Topic设置

int mqtt_set_topic(MqttClient_t *c, const char *topic, msg_handler_t handler);

将Topic及其消息处理回调函数添加到链表中。当前实现仅支持QoS 0。

连接控制

int mqtt_set_connect(MqttClient_t* c, bool start);

通过设置标志位,通知后台管理线程启动或断开连接。

数据发布

int mqtt_publish(MqttClient_t* c, const char *topic, uint8_t *data, uint32_t datalen);

用于发送应用数据。发送前会校验数据长度及缓冲区容量,超限则直接返回失败,确保数据的原子性(全部发送或失败)。

后台管理线程

后台线程是一个状态机,负责维护客户端的整个生命周期(连接、订阅、就绪、断开等状态)。

static void mqtt_thread_main(void* arg){
    ...
    //线程分离,分离后主进程退出,它也会退出
    pthread_detach(pthread_self());
    while(1){
        switch(c->client_state){
            case(CS_CONNECTING):{ ... break; }
            case(CS_SUBSCRIBING):{ ... break; }
            case(CS_RREADY):{ ... break; }
            case(CS_DISCONNECTING):{ ... } // 注意:此处无break,将连贯执行CS_CLOSESOCKET
            case(CS_CLOSESOCKET):{ ... break; }
            case(CS_IDLE):
            default:{
                if(c->connect_start == true){
                    c->client_state = CS_CONNECTING;
                    continue;
                }
                platform_timer_usleep(500*1000); // 空闲状态延时,避免空转消耗CPU
            }
        }
    }
}

实现要点

  1. pthread_detach 用于自动回收线程资源,这一设计主要是考虑到线程异常退出时的资源释放问题。在其他RTOS平台上若无此接口可移除,并在 mqtt_destroy() 中实现资源回收。
  2. CS_DISCONNECTING 状态后未使用 break,使其能连贯执行 CS_CLOSESOCKET 状态的操作。
  3. 空闲状态下的 platform_timer_usleep 延时值需权衡:过长影响连接控制的响应速度,过短则增加CPU负载。

测试验证

使用 NanoMQ 作为 MQTT 服务器进行测试。编译运行 example.c 后,可观察到正常的消息收发。 MQTT测试结果

关于长期运行的稳定性,将在后续使用中持续验证。如在使用中发现任何问题或有改进建议,欢迎反馈。




上一篇:CentOS/RHEL系统yum包管理实战:从配置到常见操作指南
下一篇:3款最小Linux发行版详解:TinyCore、4MLinux与Puppy的轻量级实战
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-11 02:07 , Processed in 0.095075 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表