锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

MQTT协议原理简介与代码实现

时间:2022-12-24 12:30:00 pw传感器


文章目录

  • 1. 前言
  • 2. 简介
  • 3. 实现方式
  • 4. 特性
    • 4.1 Qos 0的工作原理
    • 4.2 Qos 1的工作原理
    • 4.3 Qos 2的工作原理
  • 5. 事件上报
  • 6. 代码实现
    • 6.1 发布者
    • 6.2 订阅者
  • 7. 实验现象
    • 7.1 发布者
    • 7.2 订阅者
  • 参考文献


1. 前言

物联网 (Internet of things(IoT)), 不同物理对象通过各种信息传感器和无线网络技术互联。
物联网设备及PC性能低于服务器。
与局域网带宽相比,传输带宽小,速度低。
实现物联网各设备之间的信息传输,MQTT这是一个非常合适的工具。


2. 简介

物联网 (Internet of things(IoT)), 不同物理对象通过各种信息传感器和无线网络技术互联。
物联网设备及PC性能低于服务器。
与局域网带宽相比,传输带宽小,速度低。
实现物联网各设备之间的信息传输,MQTT这是一个非常合适的工具。

3. 实现方式

实现MQTT在通信过程中,需要完成客户端和服务器端的通信,MQTT协议中有三种身份:出版商:( publisher )、代理(broker)、订阅者(subscriber)。
其中,新闻发布者和订阅者都是客户端,新闻代理是服务器,新闻发布者可以同时是订阅者。
MQTT传输的信息分为:主题(Topic)和负载(payload)两部分:
(1)Topic:订阅的主题,即channel,频道;
(2)payload:新闻内容,出版商向订阅者发布的具体新闻。
以下场景为例,方便大家理解以上文字:
在这里插入图片描述
如上图所示,subscribers1和subscribers订阅的主题是topic1和topic2,publisher1向topic这个主题发送数据,所以只有subscribers1才能收到,subscribers2是收不到topic这个主题的消息。topic也是如此。
现在问题来了,怎么样MQTT如何保证信息的可靠传输?

4. 特性

MQTT支持QOS(quality of service 服务质量),提供可靠的信息传输。
Qos 0:信息最多发送一次。
Qos 1:消息至少发送一次。
Qos 消息只发一次。

4.1 Qos 0的工作原理


从上图可以看出,Qos最多发一次消息,发完信息就不管了。是一种尽力而为的服务类型。

4.2 Qos 1的工作原理


从上图可以看出,Qos 1.工作时会有确认机制,出版商会向代理服务器发送信息。如果收到代理服务器,需要回应PUBACK报纸表示你收到的数据。当出版商收到它时PUBACK数据时,会将本地缓存的信息删除。
当然,如果代理没有收到数据,出版商也没有收到数据PUBACK发布者将在一段时间后重传响应信息。
代理和订阅者也是如此。
在使用Qos 1点,订阅者可能会收到重复的信息,因此Qos 1适用于客户端可以接收和处理重复信息的场景。

4.3 Qos 2的工作原理


如上图所示,Qos 发布者的信息只能发送一次,以避免重复。
同时,虽然信息只发送一次,但会有多种确认机制,以确保可靠性。
综上所述,从0到2,可靠性逐渐提高。

5. 事件上报


当虚拟机的生命周期发生变化时,会发生事件通知。但事件发生时,默认情况下不会传输到前端界面。因此,需要帮助mqtt协议,协助信息传递。

6. 代码实现

6.1 发布者

#include  #include  #include  #include  #include  #include  #include "mosquitto.h"  #define HOST "127.0.0.1" #define PORT  1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE  512   static int  myEventCallback(virConnectPtr conn,         virDomainPtr dom,         int event,         int detail,         void *opaque)  {     const char *name = virDomainGetName(dom);     struct tm *currTime;     time_t now;     time(&now);     currTime = localtime(&now);      int ret;     struct mosquitto *mosq;     char buff[MSG_MAX_SIZE];        ret = mosquitto_lib_init();      mosq =  mosquitto_new("Pub", true, NULL);      char username[] = "username";     char password[] = "123456";     int authRet = mosquitto_username_pw_set(mosq, username, password);      ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);      printf("Start!\n");      snprintf(buff, MSG_MAX_SIZE, "%d/%d/%d %d:%d:%d event(%d) occurred in the domain = < %s >.\n", currTime->tm_year   1900, currTime->tm_mon   1, currTime->tm_mday, currTime->tm_hour, currTime->tm_min, currTime->tm_sec, event, name);          mosquitto_publish(mosq, NULL, "topic1", strlen(buff)   1, buff, 0, 0);      mosquitto_disconnect(mosq);     mosquitto_destroy(mosq);     mosquitto_lib_cleanup();     printf("End!\n");      return 0; }  static void* eventThreadLoop() {     while(1) {         if(virEventRunDefaultImpl() < 0) {             printf("Run errer.\n");         }     }     abort(); }  int main() {      virConnectPtr conn = NULL;      int eventid = VIR_DOMAIN_EVENT_ID_LIFECYCLE;      virEventRegisterDefaultImpl();      conn = virConnectOpen(NULL);         pthread_t eventThread;      pthread_create(&eventThread, NULL, eventThreadLoop, NULL);      int id = virConnectDomainEventRegisterAny(conn,                  NULL,            eventid,            VIR_DOMAIN_EVENT_CALLBACK(myEventCallback),            NULL,            NULL);      while(1) pause();      virConnectDomainEventDeregisterAny(conn, id);      virConnectClose(conn);      return 0; }  

a id="62__161">6.2 订阅者

#include 
#include 
#include "mosquitto.h"

#define HOST "127.0.0.1"
#define PORT 1883
#define KEEPALIVE 60

void my_connect_callback(struct mosquitto *mosq, void *obj, int rec) {
    printf("Call the function: my_connect_callback.\n");
    if(rec){
        printf("On_connect error.\n");
        exit(1);
    } else {
	if(mosquitto_subscribe(mosq, NULL, "topic1", 0)) {
            printf("Set topic error.\n");
	    exit(1);
        }
    }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rec) {
    printf("Call the function: my_disconnect_callback.\n");
}

void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int*granted_qos) {
    printf("Call the function: my_subscribe_callback.\n");
}

void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
    printf("Call the function: my_message_callback.\n");
    
    printf("Receive a message %s: %s", (char*)msg->topic, (char*)msg->payload);
}

int main() {
    int ret = 0;
    struct mosquitto *mosq;

    ret = mosquitto_lib_init();
    if(ret) {
        printf("init lib is failed.\n");
        return -1;
    }

    mosq = mosquitto_new("sub", true, NULL);
    if(mosq == NULL) {
        printf("create new mosquitto instance failed.\n");
        mosquitto_lib_cleanup();
        return -1;
    }

    const char username[] = "username";
    const char password[] = "123456";
    ret = mosquitto_username_pw_set(mosq, username, password);
    if(ret) {
        printf("username or password is wrong.\n");
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return -1;
    }

    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);

    ret = mosquitto_connect(mosq, HOST, PORT, KEEPALIVE);
    if(ret) {
        printf("connect to broker is failed.\n");
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return -1;       
    }

    printf("Subscribe begin.\n");
    int loop = mosquitto_loop_start(mosq);
    if(loop) {
        printf("mosquitto loop error.\n");
        return -1;
    }

    const char cmd[10];
    while(1) {
        scanf("%s", cmd);
        if(0 == strcmp(cmd, "quit")) {
            running = 0;
        }
    }

    mosquitto_disconnect(mosq);    
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    printf("Subscribe end.\n");
    return 0;   
}

7. 实验现象

7.1 发布者

对虚拟机进行操作

发布者显示信息

7.2 订阅者

接收发布者发送的虚拟机事件信息。

参考文献

[1] https://blog.p2hp.com/archives/4100
[2] https://blog.csdn.net/qq_33406883/article/details/107466430
[3] https://www.cnblogs.com/sxkgeek/p/9140180.html

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章