锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

mosquitto源码分析

时间:2022-12-24 11:30:00 pw传感器

mosquitto源码分析

    • 一、mosquitto简介
    • 二、主要目录
    • 重要的数据结构
    • 四、部分常用函数内部含义

一、mosquitto简介

mosquitto是一款实现了消息推送协议MQTT v3.1 开源消息代理软件提供轻量级,支持可发布/可订阅的消息推送模式,简化设备之间的短信通信,如低功耗传感器、手机、嵌入式计算机微控制器等移动设备。
Mosquitto采用出版/订阅模式MQTT协议,该设计模式将通信终端之间的关系统一到服务程序中进行管理,可以大大降低客户端的开发和维护。

mqtt协议详细介绍:http://t.csdn.cn/IVtqa
源码地址:https://github.com/eclipse/mosquitto

二、主要目录

主要需要注意的是/mosquitto/src、/mosquitto/lib、/mosquitto/client。

其中/src和/lib服务端主要放置在目录下(Broker)与网络相关的部分底层实现代码和操作;client目录主要是订阅客户端和发布客户端的实现源代码。

mosquitto_internal.h定义各种数据结构
mosquitto:外部调用接口
memory_mosq:内存分配处理可记录内存用量
net_mosq:网络基础操作,tcp 创建、关闭等;包装/解包数据_mosquitto_packet 写入/读取各种数据
send_mosq:主要实现发送请求逻辑(协议组包),实际命令请求实现组包
send_client_mosq:与 send_mosq 类似地,高频接口主要用于客户端;
messages_mosq:主要针对新闻的实现(PUBLISH,PUBACK,PUBREL…)
read_handle:根据数据包的类型处理收到的数据包。

重要的数据结构

会话属性(上下文):主要用于保存客户端连接的所有信息,如用户id,用户名,客户端socket,ip地址、密码、连接时间值等

struct mosquitto {  mosq_sock_t sock;//mosquitto用于与客户端连接通信的服务器程序socket描述符 #ifndef WITH_BROKER  mosq_sock_t sockpairR, sockpairW;// socket管道通知:非阻塞模式时,通知用,在mosquitto_loop 调用发送, #endif #if defined(__GLIBC__) && defined(WITH_ADNS)  struct gaicb *adns; /* For getaddrinfo_a */ #endif  enum mosquitto__protocol protocol;  char *address;//客户端IP地址  char *id;//客户端登录mosquitto提供的程序ID值,该值不能与其他客户重复  char *username;//username和password记录客户登录时提供的用户名和密码  char *password;  uint16_t keepalive;///客户端此时需要内向mosquitto发送一个服务器程序ping/pong消息  uint16_t last_mid;///最后一个消息id,发消息后    enum mosquitto_client_state state;  time_t last_msg_in;///用于记录上次收发消息的时间  time_t next_msg_out;  time_t ping_t;  struct mosquitto__packet in_packet;///接收数据包  struct mosquitto__packet *current_out_packet;  struct mosquitto__packet *out_packet;///接收数据包  struct mosquitto_message_all *will;  struct mosquitto__alias *aliases;  struct will_delay_list *will_delay_entry;  uint32_t maximum_packet_size;  int alias_count;  uint32_t will_delay_interval;  time_t will_delay_time; #ifdef WITH_TLS  SSL *ssl;  SSL_CTX *ssl_ctx;  char *tls_cafile;  char *tls_capath;  char *tls_certfile;  char *tls_keyfile;  int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);  char *tls_version;  char *tls_ciphers;  char *tls_psk;  char *tls_psk_identity;  int tls_cert_reqs;  bool tls_insecure;  bool ssl_ctx_defaults;  bool tls_ocsp_required;  char *tls_engine;  char *tls_engine_kpass_sha1;  enum mosquitto__keyform tls_keyform;  char *tls_alpn; #endif  bool want_write;  bool want_connect; #if defined(WITH_THREADING) && !defined(WITH_BROKER)  pthread_mutex_t callback_mutex;  pthread_mutex_t log_callback_mutex;  pthread_mutex_t msgtime_mutex;  pthread_mutex_t out_packet_mutex;  pthread_mutex_t current_out_packet_mutex;  pthread_mutex_t state_mutex;  pthread_mutex_t mid_mutex;  pthread_t thread_id; #endif  bool clean_start;  uint32_t session_expiry_interval;  time_t session_expiry_time; #ifdef WITH_BROKER  bool removed_from_by_id; /* True if removed from by_id hash */  bool is_dropping;  bool is_bridge;  struct mosquitto__bridge *bridge;  struct mosquitto_msg_data msgs_in;///接收消息队列,保存收到的信息;加入这个队列主要是因为整个消息流程没有完成,后续的交互需要处理  struct mosquitto_msg_data msgs_out;//发送消息队列,保存发送消息或收到消息;加入队列主要是因为整个消息流程没有完成,还有后续交互需要处理;  struct mosquitto__acl_user *acl_list;  struct mosquitto__listener *listener;  struct mosquitto__packet *out_packet_last;  struct mosquitto__subhier **subs;  struct mosquitto__subshared_ref **shared_subs;  char *auth_method;  int sub_count;  int shared_sub_count;  int pollfd_index; #  ifdef WITH_WEBSOCKETS #    if defined(LWS_LIBRARY_VERSION_NUMBER)  struct lws *wsi; #    else  struct libwebsocket_context *ws_context;  struct libwebsocket *wsi; #    endif #  endif  bool ws_want_write;  bool assigned_id; #else #  ifdef WITH_SOCKS  char *socks5_host;  int socks5_port;  char *socks5_username;  char *socks5_password; #  endif  void *userdata;  bool in_callback;  struct mosquitto_msg_data msgs_in;  struct mosquitto_msg_data msgs_out;  void (*on_connect)(struct mosquitto *, void *userdata, int rc);  void (*on_connect_with_flags)(struct mosquitto *, void *userdata, int rc, int flags);  void (*on_connect_v5)(struct mosquitto *, void *userdata, int rc, int flags, const mosquitto_property *props);  void (*on_disconnect)(struct mosquitto *, void *userdata, int rc)
	void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
	void (*on_publish)(struct mosquitto *, void *userdata, int mid);
	void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
	void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
	void (*on_message_v5)(struct mosquitto *, void *userdata, const struct mosquitto_message *message, const mosquitto_property *props);
	void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
	void (*on_subscribe_v5)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props);
	void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
	void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props);
	void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
	//void (*on_error)();
	char *host;
	int port;
	char *bind_address;
	unsigned int reconnects;
	unsigned int reconnect_delay;
	unsigned int reconnect_delay_max;
	bool reconnect_exponential_backoff;
	char threaded;
	struct mosquitto__packet *out_packet_last;
#  ifdef WITH_SRV
	ares_channel achan;
#  endif
#endif
	uint8_t maximum_qos;

#ifdef WITH_BROKER
	UT_hash_handle hh_id;
	UT_hash_handle hh_sock;
	struct mosquitto *for_free_next;
	struct session_expiry_list *expiry_list_item;
#endif
#ifdef WITH_EPOLL
	uint32_t events;
#endif
};

消息状态
消息发送与接收流程用,关注 mosq_ms_wait_for_xxxx 状态,客户端处理此类消息

enum mosquitto_msg_state {
	mosq_ms_invalid = 0,
	mosq_ms_publish_qos0 = 1,
	mosq_ms_publish_qos1 = 2,
	mosq_ms_wait_for_puback = 3,//Oos==1时,发送PUBLISH后等待PUBACK返回
	mosq_ms_publish_qos2 = 4,
	mosq_ms_wait_for_pubrec = 5,//Oos==2时,发送PUBLISH后,等待PUBREC返回
	mosq_ms_resend_pubrel = 6,
	mosq_ms_wait_for_pubrel = 7,//Oos==2时,发送PUBREC后等待PUBREL返回
	mosq_ms_resend_pubcomp = 8,
	mosq_ms_wait_for_pubcomp = 9,//Oos==2时,发送PUBREL后等待PUBCOMP返回
	mosq_ms_send_pubrec = 10,
	mosq_ms_queued = 11
};

客户端状态:该状态为用户连接成功并通讯 CONNECT 之后结果;

enum mosquitto_client_state {
	mosq_cs_new = 0,
	mosq_cs_connected = 1,
	mosq_cs_disconnecting = 2,// mosquitto_disconnect时设置
	mosq_cs_active = 3,
	mosq_cs_connect_pending = 4,//没用到
	mosq_cs_connect_srv = 5,
	mosq_cs_disconnect_ws = 6,
	mosq_cs_disconnected = 7,
	mosq_cs_socks5_new = 8,
	mosq_cs_socks5_start = 9,
	mosq_cs_socks5_request = 10,
	mosq_cs_socks5_reply = 11,
	mosq_cs_socks5_auth_ok = 12,
	mosq_cs_socks5_userpass_reply = 13,
	mosq_cs_socks5_send_userpass = 14,
	mosq_cs_expiring = 15,
	mosq_cs_duplicate = 17, /* client that has been taken over by another with the same id */
	mosq_cs_disconnect_with_will = 18,
	mosq_cs_disused = 19, /* client that has been added to the disused list to be freed */
	mosq_cs_authenticating = 20, /* Client has sent CONNECT but is still undergoing extended authentication */
	mosq_cs_reauthenticating = 21, /* Client is undergoing reauthentication and shouldn't do anything else until complete */
};

数据包、数据包队列:发送数据(组包后)或者接受数据后(解包前)状态

struct mosquitto__packet{
	uint8_t *payload;
	struct mosquitto__packet *next;
	uint32_t remaining_mult;
	uint32_t remaining_length;
	uint32_t packet_length;
	uint32_t to_process;//发送进度,记录还未发送多少字节,缺省为packet_length
	uint32_t pos;//组包或者发送时用到,发送时记录发送到什么位置
	uint16_t mid;//消息id,当Qos==0 时回调on_publish时用
	uint8_t command;
	int8_t remaining_count;
};

消息队列 专指用户消息(包PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP)

struct mosquitto_message_all{
	struct mosquitto_message_all *next;//下一个
	struct mosquitto_message_all *prev;//上一个
	mosquitto_property *properties;//属性
	time_t timestamp;//时间戳,记录本地软件tick时间
	//enum mosquitto_msg_direction direction;
	enum mosquitto_msg_state state;//状态,比如publish报文头的发送、待收到等
	bool dup;
	struct mosquitto_message msg;//消息的message主题部分结构体,mid,topic,payload,payloadlen,qos,retain,expiry_interval
};

主要处理收发消息时的缓存队列
注:

  • 该队列与数据包队列没有直接关系;
  • 数据包队列为网络层发送数据策略;
  • 该队列为协议层处理逻辑;
struct mosquitto_msg_data{ 
#ifdef WITH_BROKER
	struct mosquitto_client_msg *inflight;
	struct mosquitto_client_msg *queued;
	unsigned long msg_bytes;
	unsigned long msg_bytes12;
	int msg_count;
	int msg_count12;
#else
	struct mosquitto_message_all *inflight; //对于Qos>0的消息,记录没有完成交互记录
	int queue_len;
#  ifdef WITH_THREADING
	pthread_mutex_t mutex;
#  endif
#endif
	int inflight_quota;//队列下标
	uint16_t inflight_maximum;//队列的最大值
};

四、部分常用函数内部含义

int mosquitto_lib_version(int *major, int *minor, int *revision) 查看mosquitto源码的系统版本号

int mosquitto_lib_init(void) 初始化需要的网络资源

int mosquitto_lib_cleanup(void) 将mosquitto_lib_init函数开启的各项服务关闭,释放一些使用到的内存空间

struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata) 给struct mosquitto *mosq指针分配资源。再mosquitto_reinitialise,也就是给结构体指针里面的变量重新赋初始默认值

void mosquitto_destroy(struct mosquitto *mosq) 释放线程资源,摧毁线程,释放上下文中的资源

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章