编辑
2025-04-18
记录知识
0
请注意,本文编写于 70 天前,最后修改于 59 天前,其中某些信息可能已经过时。

目录

拉代码
编写发布者程序
编写订阅者程序
将简单发布者和简单订阅者运行起来
协议栈开发
代理(Broker)
订阅者
发布者
协议特性
总结

之前的内容已经能够在基于开源组件mosquitto 来进行mqtt的演示和利用了,但是一个产品功能并不是简单使用使用就完事儿了,所以需要开发。 而开发又分为两部分: 一个是协议栈开发,也就是针对官方协议文档去实现这份协议框架 另一个是应用开发,也就是针对协议栈进行产品应用场景的代码开发

这里我从网上找到了别人开发好的协议栈源码(cMQTT),在借助他人源码的基础之上,开发一个简单的应用程序,实现上述开源组件发布和订阅的基本功能,然后在此基础上衍生,大家一起探讨探讨,如何自己开发这个MQTT协议栈。

拉代码

git clone https://github.com/YorkJia/cMQTT.git

配置和编译

./confgure.sh && make install

这时候就会出现libcMQTT.so。这就可以利用它,去简单的编写一些代码了

编写发布者程序

  1. 创建文件
touch simple_pub.c
  1. 写个小例子
int main(int argc, char *argv[]) { int res, loop_cnt = 0, cnt = 0; mqtt_client_t *pclient = NULL; pclient = mqtt_client_new("127.0.0.1", 1883, NULL, "client/01", "kylin", "qwe123"); mqtt_set_will_opt(pclient, MQTT_QOS0, 0, "test/a", NULL); do{ printf("try to connect broker...\n"); res = mqtt_client_connect(pclient); sleep(1); }while(res != SUCCESS_RETURN); while (1) { example_publish(pclient, "Hello World"); mqtt_client_yield(pclient, 1000); } mqtt_client_close(pclient);} int example_publish(mqtt_client_t *pclient, char* data) { char payload[100]; if(pclient == NULL){ return FAIL_RETURN; }   memset(payload, 0, sizeof(payload)); sprintf(payload, data);   mqtt_publish_simple(pclient, "test/a", MQTT_QOS0, payload, strlen(payload)); }
  1. 开始编译
gcc simple_pub.c -I ../../infra/ -I ../../mqtt/ -L ../../ -lcMQTT -lpthread -o simple_pub
  1. 测试
mosquitto_sub -h 127.0.0.1 -t "test/a" -u kylin -P qwe123 ./simple_pub

可以看到mosquitto能够正常接收到hello world字串

image.png

至此,一个最简单的发布者代码已经编译完成了

编写订阅者程序

  1. 创建文件
touch simple_sub.c
  1. 写个小例子
int main(int argc, char *argv[]) { int res, loop_cnt = 0, cnt = 0; mqtt_client_t *pclient = NULL; pclient = mqtt_client_new("127.0.0.1", 1883, NULL, "client/02", "kylin", "qwe123"); mqtt_set_will_opt(pclient, MQTT_QOS0, 0, "test/a", NULL); do{ printf("try to connect broker...\n"); res = mqtt_client_connect(pclient); sleep(1); }while(res != SUCCESS_RETURN); while (1) { res = mqtt_subscribe(pclient, "test/a", MQTT_QOS0, example_message_arrive, NULL); if(res < 0){ printf("subscribe[%s] fail.\n", "test/a"); } mqtt_client_yield(pclient, 1000); } mqtt_client_close(pclient); } void example_message_arrive(void *pcontext, void *pclient, mqtt_event_msg_t *msg) { mqtt_topic_info_t *topic_info = (mqtt_topic_info_t *)msg->msg; switch (msg->event_type) { case IOTX_MQTT_EVENT_PUBLISH_RECEIVED: /* print topic name and topic message */ printf("Message Arrived:\n"); printf("Topic : %.*s\n", topic_info->topic_len, topic_info->ptopic); printf("Payload: %.*s\n", topic_info->payload_len, topic_info->payload); break; default: break; } }
  1. 编译
gcc simple_sub.c -I ../../infra/ -I ../../mqtt/ -L ../../ -lcMQTT -lpthread -o simple_sub
  1. 测试 借用之前命令来发布主题消息
./simple_sub mosquitto_pub -h localhost -V mqttv31 -t 'test/a' -u kylin -P qwe123 -i "c3" -m "Hello World" -M 0

image.png

至此,一个最简单的订阅者代码已经编译完成了

将简单发布者和简单订阅者运行起来

  1. 运行Broker
mosquitto -c /etc/mosquitto/mosquitto.conf -d
  1. 运行示例程序
./simple_sub ./simple_pub

image.png

image.png

协议栈开发

说起协议栈的开发,与应用开发相比,一般都会是一个比较大的工程。就好像做菜。 应用开发者就好比去菜市场买菜,回家烹饪(或者点个外卖)。 协议栈开发者就好比从菜种子开始种,直到成熟后,再摘菜回家烹饪。

此文章不讨论实现MQTT的细节和展示协议栈实现的具体代码。而是和大家一起讨论讨论,怎么给菜园子松松土,把种子播进去。

代理(Broker)

从协议来看,代理的职责是接收订阅者的请求,将消息发送给订阅者,接收发布者的消息。 所以实现代理的方式应该大致如下:

image.png

其中,发布者和订阅者都是通过连接的方式接入Broker,epoll负责接收所有的事件数据,并解析和转发响应数据

订阅者

从协议来看,订阅者的职责是:设置QoS质量,选择需要订阅的主题,并将其封装成报文发送给代理去解析。然后回调接收订阅的消息。 所以实现订阅者的方式应该大致如下:

image.png

其中,回调处理主要为业务处理逻辑,如获取的温度,湿度,亮度等信息需要如何处理。 封装和解析的报文包括:CONNECT,CONNACK,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK,PINGREQ,DISCONNECT

发布者

从协议来看,发布者的职责是:设置QoS质量,选择需要订阅的主题,并将其封装成报文发送给代理去解析。而发布者同时也可做订阅者,所以也可以回调接收订阅的消息 所以实现发布者(仅发布)的方式应该大致如下:

image.png

封装和解析的报文包括: CONNECT,CONNACK,PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,PINGREQ,DISCONNECT

协议特性

这里列举需要实现的协议特性:
1.主题设置:合理的设置主题和判断主题
2.主题过滤器:合理的使用通配符
3.会话管理:Broker合理的管理和调度众多的会话连接
4.保持连接:合理的判断连接是否持续
5.临终遗嘱:合理的执行遗嘱内容,包括主题,消息
6.响应:合理的设置响应,如服务端在合理的时间内收不到connect报文,应该主动管理会话
7.Qos等级:合理的运用QoS设置报文发送方式
8.清理会话:合理的保留和遗弃上一次会话的消息
9.保留消息:合理的判断是否保留上一次发送的消息

总结

至此,我们演示了MQTT的应用场景,我们既需要基于MQTT开发应用,又需要根据MQTT定制协议