Rocketmq入门级教程,大厂必备的MQ技术,要进大厂一定得会这个
时间:2023-08-16 06:07:00
系统架构
- NameServer是一个Broker与Topic路由注册中心,支持Broker动态注册和发现。
RocketMQ思想来自Kafka,而Kafka是依赖了Zookeeper所以,在RocketMQ早期版本,即在
MetaQ v1.0与v2.0版本也依赖于Zookeeper的。从MetaQ v3.0,即RocketMQ开始去掉了
Zookeeper依赖,用自己的NameServer。
- 主要包括两个功能:
Broker管理:接受Broker集群注册信息作为路由信息的基本数据保存;提供心跳检测
机制,检查Broker是否还活着。
路由信息管理:每个NameServer中都保存着Broker客户端查询集群的整个路由信息和队列
信息。Producer和Conumser通过NameServer可获得整个Broker集群路由信息被消除
投递和消费利息。
路由注册
- NameServer但是,NameServer无状态,即NameServer集群中的各
每个节点之间没有区别,每个节点之间没有信息通信。每个节点中的数据是如何同步的?
Broker节点启动时,轮询NameServer列表,每一个NameServer节点建立长连接,发起注册请求。
NameServer内部维护Broker用于动态存储的列表Broker如图所示:
这与其他注册中心集群非常不同。Broker只需连接注册中心即可。.
- 优点:NameServer由于集群之间没有通信,集群很容易建立.
- 缺点:虽然集群很容易建立,但每次都有一个扩展NameServer都需要将自己的地址指出,而且与Broker建立新的连接也需要修改Broker内部接口,所以注册中心的集群不是随意扩大的.
- Broker为了证明自己还活着,为了维护和NameServer之间的长连接将使心跳包中的最新信息
方式上报给NameServer,心跳每30秒发送一次。心跳包含 BrokerId、Broker地址(IP Port)、 Broker名称、Broker所属集群名称等等。NameServer收到心跳包后,会更新心跳时间戳,记录这个 个Broker最新生存时间。
路由剔除
- 由于Broker关机、停机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer也许会把它从Broker删除列表。
- NameServer每10秒扫描一次定时任务Broker表,查看每一个Broker最新心跳时间
戳距离当前时间是否超过120秒,如果超过,将确定Broker失效,然后将其从Broker删除列表。
扩展:对于_RocketMQ_例如,日常运维工作_Broker_升级,需要停止_Broker_的工作。_OP_需要怎么
做?
_OP_需要将_Broker_禁止阅读和写作权限。_client(Consumer_或_Producer)_向_broker_发送请求时,会收到
到_broker_的_NO_PERMISSION(无权限)_响应,然后_client_其他的都会进行_Broker_的重试。
当_OP_观察到这个_Broker_没有流量后,关闭它,实现它_Broker_从_NameServer_的移除。
OP:运维工程师
SRE:Site Reliability Engineer,现场可靠性工程师
路由发现
- RocketMQ路由发现采用Pull模型。当Topic当路由信息发生变化时,NameServer不会主动推送给
客户端,但客户端定期拉主题的最新路由。默认情况下,客户端每30秒拉最新路由一次。
扩展:
1)Push_模型:推送模型。实际上,时性更好,是一个“发布-订阅”_模型需要长连接。
维护长连接需要资源成本。该模型适用于场景:
实时性要求高
_Client_数量少(数量多,长链接多),_Server_数据变化频繁
2)_Pull_模型:提取模型。问题是实时性差,定期提取,可能无法在多个时间段(这些时间段内路由信息没有变化)提取新数据。
3)_Long Polling_模型:长轮询模型。_Push_与_Pull_模型的集成充分利用了这两种模型的优势
势,屏蔽了它们的缺点。
客户端选择NameServer
- 这里的客户端是指消费者和生产者
- 配置客户端时,必须写上NameServer集群地址,那么客户端连接?NameServer节点
呢?客户端会先生产一个随机数,然后与之合作NameServer节点数量取模,此时需要连接
节点索引,然后连接。如果连接失败,将使用它round-robin一个接一个地尝试连接策略
点。 首先是随机策略的选择,失败后是轮询策略。
扩展:Zookeeper Client_是如何选择_Zookeeper Server_的?
简单来说,两次之后_Shuf? e,然后选择第一个_Zookeeper Server_。
具体来说,在配置文件中_zk server_第一次地址_shuf? e_,然后随机选择一个。选择这个
一般都是一个_hostname_。然后得到_hostname_对应的所有_ip_,再对这些_ip_进行第二次
shuf? e,从_shuf? e_取第一个结果_server_连接地址。
Broker
- Broker负责存储和转发色,负责存储和转发新闻。Broker在RocketMQ系统负责接收和存储系统
生产者发送来的消息,同时为消费者的拉取请求作准备。Broker还存储与新闻相关的元数据,包括
消费者消费进度偏移offset、主题、队列等。 _Kafka 0.8_版本之后,_offset_是存放在_Broker_中间,之前的版本存储在_Zookeeper_中的。
- Remoting Module:整个Broker负责处理来自实体的实体clients终端请求。而这个Broker实体由以下模块组成。
- Client Manager:客户端管理器。负责接收和分析客户端(Producer/Consumer)请求,管理客户端。 如,维护Consumer的Topic订阅信息
- Store Service:存储服务。提供方便简单的服务。API接口,将信息存储在物理硬盘和信息查询功能中。
- HA Service:提供高可用服务Master Broker 和 Slave Broker数据同步功能之间。
- Index Service:索引服务。根据具体情况Message key,对投递到Broker索引服务也提到了新闻 供根据Message Key快速查询消息的功能。
- 为了增强Broker性能和吞吐量,Broker一般以集群的形式出现。相同的集群节点可能存储在每个集群节点中 Topic的不同Queue。然而,如果某某,这里有一个问题Broker如何保证,如何保证数据不丢失?其解决 方案是,每一个Broker集群节点水平扩展,即将到来Broker再建一个节点HA集群,解决单点问题。
- Broker节点集群是主从集群,即集群Master与Slave两种角色。Master负责处理读写操作请 求,Slave负责对Master备份中间的数据。当Master挂掉了,Slave自动切换Master去工作。所 以这个Broker集群是主备集群。一个Master可包含多个Slave,但一个Slave只能属于一个Master。 Master与Slave 的对应关系是通过指定相同的BrokerName、不同的BrokerId 来确定的。BrokerId为0表示Master,非0表示Slave。每个Broker与NameServer建立集群中所有节点的长连接,定期注册Topic信息到所有NameServer。
系统流程
具体流程
- 启动NameServer,NameServer启动后,开始监控端口,等待Broker、Producer、Consumer连接。
- 启动roker时,Broker会与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时 发送心跳包。
- 发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创
建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消
息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获
取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算 法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取 到路由 信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信 息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,
然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始 消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信 息不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
Topic创建模式
- 手动创建:集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量是相同的。Broker模式:该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以不同。
- 自动创建Topic时,默认采用的是Broker模式,会为每个Broker默认创建4个Queue。
读写队列
- 本质上同一个队列,但是逻辑上分的.
- 例如,创建Topic时设置的写队列数量为8,读队列数量为4,此时系统会创建8个Queue,分别是0 1 2 3 4 5 6 7。Producer会将消息写入到这8个队列,但Consumer只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的消息是不会被消费到的。
- 再如,创建Topic时设置的写队列数量为4,读队列数量为8,此时系统会创建8个Queue,分别是0 1 2 3 4 5 6 7。Producer会将消息写入到0 1 2 3 这4个队列,但Consumer只会消费0 1 2 3 4 5 6 7这8个队列中的消息,但是4 5 6 7中是没有消息的。此时假设Consumer Group中包含两个Consuer,Consumer1消费0 1 2 3,而Consumer2消费4 5 6 7。但实际情况是,Consumer2是没有消息可消费的。
- 也就是说,当读/写队列数量设置不同时,总是有问题的。那么,为什么要这样设计呢?
- 其这样设计的目的是为了,方便Topic的Queue的缩容。例如,原来创建的Topic中包含16个Queue,如何能够使其Queue缩容为8个,还不会丢失消息?可以动态修改写队列数量为8,读队列数量不变。此时新的消息只能写入到前8个队列,而消费都消费的却是16个队列中的数据。当发现后有只有8个Queue中的消息消费完毕后,就可以知道要缩容了,再将读队列数量动态设置为8。整个缩容过程,没有丢失任何消息。
- perm用于设置对当前创建Topic的操作权限:2表示只写,4表示只读,6表示读写。
安装
- 下载rocketmq-all-4.9.2-bin-release.zip,然后直接解压,传至虚拟机.
- 解压得到文件夹rocketmq-4.9.2,cd rocketmq-4.9.2进入该文件.
- 使用vim命令打开bin/runserver.sh文件。现将这些值修改为如下:
- 使用vim命令打开bin/runbroker.sh文件
启动并查看NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success…
启动Broker并查看
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success…
- 启动完成后,cd ~ 进入根目录,ll查看文件,会多出logs和store两个文件夹.
发送和接收
生产者
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= …
消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt…
关闭服务
sh bin/mqshutdown broker
The mqbroker(36695) is running…
Send shutdown request to mqbroker(36695) OK
sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running…
Send shutdown request to mqnamesrv(36664) OK
控制台的安装与启动
- RocketMQ有一个可视化的dashboard,通过该控制台可以直观的查看到很多数据。
- 下载地址:https://github.com/apache/rocketmq-externals/releases
- 修改其src/main/resources中的application.properties配置文件。
- 原来的端口号为8080,修改为一个不常用的
- 指定RocketMQ的name server地址
- 添加依赖
在解压目录rocketmq-console的pom.xml中添加如下JAXB依赖。
JAXB,Java Architechture for Xml Binding,用于_XML_绑定的_Java_技术,是一个业界标准,是一
项可以根据_XML Schema_生成_Java_类的技术。
<dependency>
<groupId>javax.xml.bindgroupId>
<artifactId>jaxb-apiartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>com.sun.xml.bindgroupId>
<artifactId>jaxb-implartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>com.sun.xml.bindgroupId>
<artifactId>jaxb-coreartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>javax.activationgroupId>
<artifactId>activationartifactId>
<version>1.1.1version>
dependency>
- 在rocketmq-console目录下运行maven的打包命令。
- 得到下图的jar才算成功
- 在target目录下进入cmd
- 访问 : localhost:7000
如果刚才执行过发送消息服务,这些消息便会出现在这个列表中
集群
- 在这个集群中,生产者和消费者的集群最容易配置,比如:只需要将每个生产者配置在同一个组里便形成了集群.
- NameServer这个集群也容易配置,因为NameServer集群里的NameServer之间不进行通讯.
复制与刷盘
刷盘
同步刷盘
- 将消息持久化到磁盘中才算成功
异步刷盘
- 将消息写入到内存,就可以回应说成功了,这里写入内存一般是写入内存的PageCache里,刷盘的操作要等到PageCache里堆积了一定量的消息才进行.
复制
同步复制
- 消息写入Master后,Slave同步完之后,回应才算成功
异步复制
- 消息写入Master后,就回应了,算成功,复制和写入异步进行.
对比
- 异步刷盘策略会降低系统的写入延迟,_RT_变小,提高了系统的吞吐量
集群模式
单模式
- 但模式只有一个master,不能解决单点故障问题/
多Master
- 没有Slave,性能最好,如果Master都配置了_RAID_磁盘阵列,由于该磁盘的特性,即使宕机,消息也不会丢失.
- 但是因为没有Slave,一旦宕机了,没有Slave继续Master的工作,里面的消息便不能被消费.
多Master多Slave模式-异步复制
- broker集群由多个master构成,每个master又配置了多个slave(在配置了RAID磁盘阵列的情况下,一
个master一般配置一个slave即可)。master与slave的关系是主备关系,即master负责处理消息的读写
请求,而slave仅负责消息的备份与master宕机后的角色切换
- 异步复制即前面所讲的复制策略中的异步复制策略,即消息写入master成功后,master立即向
producer返回成功ACK,无需等待slave同步数据成功。
- 该模式的最大特点之一是,当master宕机后slave能够自动切换为master。不过由于slave从master的同
步具有短暂的延迟(毫秒级),所以当master宕机后,这种异步复制方式可能会存在少量消息的丢失问
题。
- _Slave_从_Master_同步的延迟越短,其可能丢失的消息就越少.对于_Master_的_RAID_磁盘阵列,若使用的也是异步复制策略,同样也存在延迟问题,同样也可能会丢失消息。但_RAID_阵列的秘诀是微秒级的(因为是由硬盘支持的,所以其丢失的数据量会更少。
多Master多Slave模式-同步双写
- 该模式是多Master多Slave模式的同步复制实现。所谓同步双写,指的是消息写入master成功后,
master会等待slave同步数据成功后才向producer返回成功ACK,即master与slave都要写入成功后才会
返回成功ACK,也即双写。
- 该模式与异步复制模式相比,优点是消息的安全性更高,不存在消息丢失的情况。但单个消息的RT略
高,从而导致性能要略低(大约低10%)。
- 该模式存在一个大的问题:对于目前的版本,Master宕机后,Slave不会自动切换到Master。
最佳实践
- 一般会为Master配置RAID10磁盘阵列,然后再为其配置一个Slave。即利用了RAID10磁盘阵列的高
效、安全性,又解决了可能会影响订阅的问题。
- _RAID_磁盘阵列的效率要高于_Master-Slave_集群。因为_RAID_是硬件支持的。也正因为如此,
所以_RAID_阵列的搭建成本较高。
多_Master+RAID_阵列,与多_Master_多_Slave_集群的区别是什么?
多_Master+RAID_阵列,其仅仅可以保证数据不丢失,即不影响消息写入,但其可能会影响到
消息的订阅。但其执行效率要远高于多Master多Slave集群
多_Master_多_Slave_集群,其不仅可以保证数据不丢失,也不会影响消息写入。其运行效率要低
于多Master+RAID阵列
磁盘阵列(RAID)
- 1988 年美国加州大学伯克利分校的 D. A. Patterson 教授等首次在论文 “A Case of Redundant Array of Inexpensive Disks” 中提出了 RAID 概念 ,即廉价冗余磁盘阵列(Redundant Array of Inexpensive Disks )。由于当时大容量磁盘比较昂贵, RAID 的基本思想是将多个容量较小、相对廉价的磁盘进行 有机组合,从而以较低的成本获得与昂贵大容量磁盘相当的容量、性能、可靠性。随着磁盘成本和价格的不断降低, “廉价” 已经毫无意义。因此, RAID 咨询委员会( RAID Advisory Board, RAB )决定用 “ 独立 ” 替代 “ 廉价 ” ,于时 RAID 变成了独立磁盘冗余阵列(Redundant Array of Independent Disks )。但这仅仅是名称的变化,实质内容没有改变。
- RAID 这种设计思想很快被业界接纳, RAID 技术作为高性能、高可靠的存储技术,得到了非常广泛的 应用。 RAID 主要利用镜像、数据条带和数据校验三种技术来获取高性能、可靠性、容错能力和扩展性,根据对这三种技术的使用策略和组合架构,可以把 RAID 分为不同的等级,以满足不同数据应用的需求。
- D. A. Patterson 等的论文中定义了 RAID0 ~ RAID6 原始 RAID 等级。随后存储厂商又不断推出 RAID7 、 RAID10、RAID01 、 RAID50 、 RAID53 、 RAID100 等 RAID 等级,但这些并无统一的标准。目前 业界与学术界公认的标准是 RAID0 ~ RAID6 ,而在实际应用领域中使用最多的 RAID 等级是 RAID0 、 RAID1 、 RAID3 、 RAID5 、 RAID6 和 RAID10。
- RAID 每一个等级代表一种实现方法和技术,等级之间并无高低之分。在实际应用中,应当根据用户的 数据应用特点,综合考虑可用性、性能和成本来选择合适的 RAID 等级,以及具体的实现方式。
关键技术
镜像技术
- 镜像技术是一种冗余技术,为磁盘提供数据备份功能,防止磁盘发生故障而造成数据丢失。对于 RAID 而言,采用镜像技术最典型地的用法就是,同时在磁盘阵列中产生两个完全相同的数据副本,并且分布在两个不同的磁盘上。镜像提供了完全的数据冗余能力,当一个数据副本失效不可用时,外部系统仍可正常访问另一副本,不会对应用系统运行和性能产生影响。而且,镜像不需要额外的计算和校验,故障修复非常快,直接复制即可。镜像技术可以从多个副本进行并发读取数据,提供更高的读 I/O 性能,但不能并行写数据,写多个副本通常会导致一定的 I/O 性能下降。 镜像技术提供了非常高的数据安全性,其代价也是非常昂贵的,需要至少双倍的存储空间。高成本限制了镜像的广泛应用,主要应用于至关重要的数据保护,这种场合下的数据丢失可能会造成非常巨大的损失。
数据条带技术
- 数据条带化技术是一种自动将 I/O操作负载均衡到多个物理磁盘上的技术。更具体地说就是,将一块连续的数据分成很多小部分并把它们分别存储到不同磁盘上。这就能使多个进程可以并发访问数据的多个不同部分,从而获得最大程度上的 I/O 并行能力,极大地提升性能。
数据校验技术
- 数据校验技术是指, RAID 要在写入数据的同时进行校验计算,并将得到的校验数据存储在 RAID 成员 磁盘中。校验数据可以集中保存在某个磁盘或分散存储在多个不同磁盘中。当其中一部分数据出错时就可以对剩余数据和校验数据进行反校验计算重建丢失的数据。
- 数据校验技术相对于镜像技术的优势在于节省大量开销,但由于每次数据读写都要进行大量的校验运 算,对计算机的运算速度要求很高,且必须使用硬件 RAID 控制器。在数据重建恢复方面,检验技术比镜像技术复杂得多且慢得多。
RAID等级
JBOD
JBOD ,Just a Bunch of Disks,磁盘簇。表示一个没有控制软件提供协调控制的磁盘集合,这是 RAID
区别与 JBOD 的主要因素。 JBOD 将多个物理磁盘串联起来,提供一个巨大的逻辑磁盘。
JBOD 的数据存放机制是由第一块磁盘开始按顺序往后存储,当前磁盘存储空间用完后,再依次往后面
的磁盘存储数据。 JBOD 存储性能完全等同于单块磁盘,而且也不提供数据安全保护。
其只是简单提供一种扩展存储空间的机制,_JBOD_可用存储容量等于所有成员磁盘的存储空间之
和
JBOD 常指磁盘柜,而不论其是否提供 RAID 功能。不过,JBOD并非官方术语,官方称为Spanning。
RAID0
RAID0 是一种简单的、无数据校验的数据条带化技术。实际上不是一种真正的 RAID ,因为它并不提
供任何形式的冗余策略。 RAID0 将所在磁盘条带化后组成大容量的存储空间,将数据分散存储在所有
磁盘中,以独立访问方式实现多块磁盘的并读访问。
理论上讲,一个由 n 块磁盘组成的 RAID0 ,它的读写性能是单个磁盘性能的 n 倍,但由于总线带宽等
多种因素的限制,实际的性能提升低于理论值。由于可以并发执行 I/O 操作,总线带宽得到充分利用。
再加上不需要进行数据校验,RAID0 的性能在所有 RAID 等级中是最高的。
RAID0 具有低成本、高读写性能、 100% 的高存储空间利用率等优点,但是它不提供数据冗余保护,一
旦数据损坏,将无法恢复。
应用场景:对数据的顺序读写要求不高,对数据的安全性和可靠性要求不高,但对系统性能要求很高的
场景。
RAID0_与_JBOD_相同点:
1)存储容量:都是成员磁盘容量总和
2)磁盘利用率,都是_100%,即都没有做任何的数据冗余备份
_RAID0_与_JBOD_不同点:
JBOD:数据是顺序存放的,一个磁盘存满后才会开始存放到下一个磁盘
RAID:各个磁盘中的数据写入是并行的,是通过数据条带技术写入的。其读写性能是_JBOD_的_n _
倍
RAID1
RAID1 就是一种镜像技术,它将数据完全一致地分别写到工作磁盘和镜像磁盘,它的磁盘空间利用率
为 50% 。 RAID1 在数据写入时,响应时间会有所影响,但是读数据的时候没有影响。 RAID1 提供了
最佳的数据保护,一旦工作磁盘发生故障,系统将自动切换到镜像磁盘,不会影响使用。
RAID1是为了增强数据安全性使两块磁盘数据呈现完全镜像,从而达到安全性好、技术简单、管理方
便。 RAID1 拥有完全容错的能力,但实现成本高。
应用场景:对顺序读写性能要求较高,或对数据安全性要求较高的场景。
RAID10(1和0)
RAID10是一个RAID1与RAID0的组合体,所以它继承了RAID0的快速和RAID1的安全。
简单来说就是,先做条带,再做镜像。发即将进来的数据先分散到不同的磁盘,再将磁盘中的数据做
镜像。
RAID01
RAID01是一个RAID0与RAID1的组合体,所以它继承了RAID0的快速和RAID1的安全。
简单来说就是,先做镜像再做条带。即将进来的数据先做镜像,再将镜像数据写入到与之前数据不同
的磁盘,即再做条带。
_因为RAID10是先进行条带,已经将数据写进去,后进行镜像,等于将数据备份了一个磁盘,所以当其中一个磁盘挂了,但这一条数据也不算丢失,所以RAID10_要比_RAID01_的容错率再高,所以生产环境下一般是不使用_RAID01_的。

集群安装
- 准备两台虚拟机,都配备RMQ的环境.
第一台机器
- 修改配置文件,要修改的配置文件在rocketMQ解压目录的conf/2m-2s-async目录中,修改broker-a.properties
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-a
# master的brokerId为0
brokerId=0
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=ASYNC_MASTER
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.188.100:9876;192.168.188.130:9876
- 修改同一目录下的broker-b-s.properties
brokerClusterName=DefaultCluster
# 指定这是另外一个master-slave集群
brokerName=broker-b
# slave的brokerId为非0
brokerId=1
deleteWhen=04
fileReservedTime=48
# 指定当前broker为slave
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
# 指定Broker对外提供服务的端口,
#即Broker与producer与consumer通信的端口。
#默认 10911。由于当前主机同时充当着master1与slave2,
#而前面的master1使用的是默认端口。这 里需要将这两个端口加以区分,
#以区分出master1与slave2
listenPort=11911
# 指定消息存储相关的路径。默认路径为~/store目录。
#由于当前主机同时充当着master1与 slave2,master1使用的是默认路径,
#这里就需要再指定一个不同路径
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
第二台机器
- 同样路径下,修改broker-b.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
- broker-a-s.properties:
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
listenPort=11911
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
其他配置
#默认为新建Topic所创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议生产环境中关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议生产环境中关闭
autoCreateSubscriptionGroup=true
#Broker对外提供服务的端口,即Broker与producer与consumer通信的端口
listenPort=10911
#HA高可用监听端口,即Master与Slave间通信的端口,默认值为listenPort+1
haListenPort=10912
#指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
#指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
#指定commitLog目录中每个文件的大小,默认1G
mapedFileSizeCommitLog=1073741824
#指定ConsumeQueue的每个Topic的每个Queue文件中可以存放的消息数量,默认30w条
mapedFileSizeConsumeQueue=300000
#在清除过期文件时,如果该文件被其他线程所占用(引用数大于0,比如读取消息),此时会阻止 此次删除任务,同时在第一次试图删除该文件时记录当前时间戳。该属性则表示从第一次拒绝删除 后开始计时,该文件最多可以保留的时长。在此时间内若引用数仍不为0,则删除仍会被拒绝。不过 时间到后,文件将被强制删除
destroyMapedFileIntervalForcibly=120000
#指定commitlog、consumequeue所在磁盘分区的最大使用率,超过该值,则需立即清除过期文件
diskMaxUsedSpaceRatio=88
#指定store目录的路径,默认在当前用户主目录中
storePathRootDir=/usr/local/rocketmq-all-4.5.0/store
#commitLog目录路径
storePathCommitLog=/usr/local/rocketmq-all-4.5.0/store/commitlog
#consumeueue目录路径
storePathConsumeQueue=/usr/local/rocketmq-all-4.5.0/store/consumequeue
#index目录路径
storePathIndex=/usr/local/rocketmq-all-4.5.0/store/index
#checkpoint文件路径
storeCheckpoint=/usr/local/rocketmq-all-4.5.0/store/checkpoint
#abort文件路径
abortFile=/usr/local/rocketmq-all-4.5.0/store/abort
#指定消息的最大大小
maxMessageSize=65536
#Broker的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盘策略
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡 时IP地址可能读取错误
brokerIP1=192.168.3.105
启动
启动NameServer(两台机器启动命令相同)
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动两个Master
- 第一台机器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
- 第二台机器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
启动两个Slave
- 第一台机器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
- 第一台机器
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
mqadmin命令 (了解)
- 在mq解压目录的bin目录下有一个mqadmin命令,该命令是一个运维指令,用于对mq的主题,集群,broker 等信息进行管理。
该命令在官网中有详细的用法解释。
https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md
消息的产生过程
- Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
- NameServer返回该Topic的路由表及Broker列表
- Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息
- Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
- Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue
路由表:实际是一个_Map_,key_为_Topic_名称,value_是一个_QueueData_实例列表。QueueData_并不
是一个_Queue_对应一个_QueueData,而是一个_Broker_中该_Topic_的所有_Queue_对应一个
QueueData。即,只要涉及到该_Topic_的_Broker,一个_Broker_对应一个_QueueData。QueueData_中
包含_brokerName。简单来说,路由表的_key_为_Topic_名称,_value_则为所有涉及该_Topic_的
_BrokerName_列表。
Broker_列表:其实际也是一个_Map。key_为_brokerName,value_为_BrokerData。一个_Broker_对应一
个_BrokerData_实例,对吗?不对。一套_brokerName_名称相同的_Master-Slave_小集群对应一个
BrokerData。BrokerData_中包含_brokerName_及一个_map。该_map_的_key_为_brokerId_,_value_为该
broker_对应的地址。brokerId_为_0_表示该_broker_为_Master,非_0_表示_Slave。
Queue选择算法
轮询算法
默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。
该算法存在一个问题:由于某些原因,在某些_Broker_上的_Queue_可能投递延迟较严重。从而导致
_Producer_的缓存队列中出现较大的消息积压,影响消息的投递性能。
最小投递延迟算法
该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。
如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。
该算法也存在一个问题:消息在_Queue_上的分配不均匀。投递延迟小的_Queue_其可能会存在大量
的消息。而对该_Queue_的消费者压力会增大,降低消息的消费能力,可能会导致_MQ_中消息的堆
积。
消息存储
- RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中。
- abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动
- Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。
- checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
- commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
- conæg:存放着Broker运行期间的一些配置数据
- consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
- index:其中存放着消息索引文件indexFile
- lock:运行期间使用到的全局资源锁
**commitlog文件 **
- 说明:在很多资料中_commitlog_目录中的文件简单就称为_commitlog_文件。但在源码中,该文件
被命名为_mappedFile_。
- commitlog目录中存放着很多的mappedFile文件,当前Broker中的所有消息都是落盘到这些
mappedFile文件中的。mappedFile文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示
当前文件的第一条消息的起始位移偏移量。
第一个文件名一定是_20_位_0_构成的。因为第一个文件的第一条消息的偏移量_commitlog offset_为_0 _
当第一个文件放满时,则会自动生成第二个文件继续存放消息。假设第一个文件大小(总大小为1G)是
_1073741820_字节(1G = 1073741824_字节),则第二个文件名就是_00000000001073741820。
以此类推,第_n_个文件名应该是前_n-1_个文件大小之和。
一个_Broker_中所有_mappedFile_文件的_commitlog offset_是连续的
- 需要注意的是,一个Broker中仅包含一个commitlog目录,所有的mappedFile文件都是存放在该目录中的。即无论当前Broker中存放着多少Topic的消息,这些消息都是被顺序写入到了mappedFile文件中 的。也就是说,这些消息在Broker中存放时并没有被按照Topic进行分类存放。
_mappedFile_文件是顺序读写的文件,所有其访问效率很高
无论是_SSD_磁盘还是_SATA_磁盘,通常情况下,顺序存取效率都会高于随机存取。
消息单元
mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理
位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度
TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消
息在Queue中存储的偏移量QueueOffset等近20余项消息相关属性。
需要注意到,消息单元中是包含_Queue_相关属性的。所以,我们在后续的学习中,就需要十分
留意_commitlog_与_queue_间的关系是什么?一个_mappedFile_文件中第_m+1_个消息单元的_commitlog offset_偏移量 为: L(m+1) = L(m) + MsgLen(m) (m >= 0)
目录与文件
为了提高效率,会为每个Topic在~/store/consumequeue中创建一个目录,目录名为Topic名称。在该
Topic目录下,会再为每个该Topic的Queue建立一个目录,目录名为queueId。每个目录中存放着若干
consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具
体的消息。
consumequeue文件名也由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量。与
mappedFile文件名不同的是,其后续文件名是固定的。因为consumequeue文件大小是固定不变的。
索引条目
每个consumequeue文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性:消息在
mappedFile文件中的偏移量CommitLog Offset、消息长度、消息Tag的hashcode值。这三个属性占20
个字节,所以每个文件的大小是固定的30w * 20字节。
一个_consumequeue_文件中所有消息的_Topic_一定是相同的。但每条消息的_Tag_可能是不同的。
消息写入
一条消息进入到Broker后经历了以下几个过程才最终被持久化。
Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即
QueueOffset
将queueId、queueOffset等数据,与消息一起封装为消息单元
将消息单元写入到commitlog
同时,形成消息索引条目
将消息索引条目分发到相应的consumequeue
消息拉取
当Consumer来拉取消息时会经历以下几个步骤:
Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的
消息offset
消费_offset_即消费进度,consumer_对某个_Queue_的消费_offset,即消费到了该_Queue_的第几
条消息
消息_offset = _消费_offset + 1_Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息
Tag。
Broker计算在该consumequeue中的queueOffset。
_queueOffset = _消息_offset * 20_字节
从该queueOffset处开始向后查找第一个指定Tag的索引条目。
解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset
从对应commitlog offset中读取消息单元,并发送给Consumer
性能提升
- RocketMQ中,无论是消息本身还是消息索引,都是存储在磁盘上的。其不会影响消息的消费吗?当然 不会。其实RocketMQ的性能在目前的MQ产品中性能是非常高的。因为系统通过一系列相关机制大大提升了性能。
- 首先,RocketMQ对文件的读写操作是通过mmap零拷贝进行的,将对文件的操作转化为直接对内存地 址进行操作,从而极大地提高了文件的读写效率。
- 其次,consumequeue中的数据是顺序存放的,还引入了PageCache的预读取机制,使得对 consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。
_PageCache_机制,页缓存机制,是_OS_对文件的缓存机制,用于加速对文件的读写操作。一般来
说,程序对文件进行顺序读写的速度几乎接近于内存读写速度,主要原因是由于_OS_使用
PageCache_机制对读写访问操作进行性能优化,将一部分的内存用作_PageCache。
写操作:OS_会先将数据写入到_PageCache_中,随后会以异步方式由_pdæ ush(
_page dirty æ ush) _
内核线程将_Cache_中的数据刷盘到物理磁盘
读操作:若用户要读取数据,其首先会从_PageCache_中读取,若没有命中,则_OS_在从物理磁
盘上加载该数据到_PageCache_的同时,也会顺序对其相邻数据块中的数据进行预读取。
- RocketMQ中可能会影响性能的是对commitlog文件的读取。因为对commitlog文件来说,读取消息时 会产生大量的随机访问,而随机访问会严重影响性能。不过,如果选择合适的系统IO调度算法,比如 设置调度算法为Deadline(采用SSD固态硬盘的话),随机读的性能也会有所提升。
与Kafka对比
- RocketMQ的很多思想来源于Kafka,其中commitlog与consumequeue就是。
- RocketMQ中的commitlog目录与consumequeue的结合就类似于Kafka中的partition分区目录。 mappedFile文件就类似于Kafka中的segment段。
Kafka_中的_Topic_的消息被分割为一个或多个_partition。partition_是一个物理概念,对应到系统上
就是_topic_目录下的一个或多个目录。每个_partition_中包含的文件称为_segment,是具体存放消息
的文件。
_Kafka_中消息存放的目录结构是:_topic_目录下有_partition_目录,_partition_目录下有_segment_文件
_Kafka_中没有二级分类标签_Tag_这个概念
_Kafka_中无需索引文件。因为生产者是将消息直接写在了_partition_中的,消费者也是直接从
_partition_中读取数据的
indexFile
- 除了通过通常的指定Topic进行消息消费外,RocketMQ还提供了根据key进行消息查询的功能。该查询 是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索 引数据是在包含了key的消息被发送到Broker时写入的。如果消息中没有包含key,则不会写入。
索引条目录结构
- 每个Broker中会包含一组indexFile,每个indexFile都是以一个时间戳命名的(这个indexFile被创建时 的时间戳)。每个indexFile文件由三部分构成:indexHeader,slots槽位,indexes索引数据。每个 indexFile文件中包含500w个slot槽。而每个slot槽又可能会挂载很多的index索引单元。
- indexHeader固定40个字节,其中存放着如下数据:
beginTimestamp:该indexFile中第一条消息的存储时间
endTimestamp:该indexFile中最后一条消息存储时间
beginPhyoffset:该indexFile中第一条消息在commitlog中的偏移量commitlog offset
endPhyoffset:该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
hashSlotCount:已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这
里统计的是所有挂载了index索引单元的slot槽的数量)
indexCount:该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所
有index索引单元的数量之和)
- indexFile中最复杂的是Slots与Indexes间的关系。在实际存储时,Indexes是在Slots后面的,但为了便
于理解,将它们的关系展示为如下形式:
- key的hash值 % 500w的结果即为slot槽位,然后将该slot值修改为该index索引单元的indexNo,根
据这个indexNo可以计算出该index单元在indexFile中的位置。不过,该取模结果的重复率是很高的,
为了解决该问题,在每个index索引单元中增加了preIndexNo,用于指定该slot中当前index索引单元的
前一个index索引单元。而slot中始终存放的是其下最新的index索引单元的indexNo,这样的话,只要
找到了slot就可以找到其最新的index索引单元,而通过这个index索引单元就可以找到其之前的所有
index索引单元。
_indexNo_是一个在_indexFile_中的流水号,从_0_开始依次递增。即在一个_indexFile_中所有_indexNo_是
以此递增的。_indexNo_在_index_索引单元中是没有体现的,其是通过_indexes_中依次数出来的。
- index索引单元默写20个字节,其中存放着以下四个属性:
keyHash:消息中指定的业务key的hash值
phyOffset:当前key对应的消息在commitlog中的偏移量commitlog offset
timeDiff:当前key对应消息的存储时间与当前indexFile创建时间的时间差
preIndexNo:当前slot下当前index索引单元的前一个index索引单元的indexNo
** indexFile的创建**
indexFile的文件名为当前文件被创建时的时间戳。这个时间戳有什么用处呢?
根据业务key进行查询时,查询条件除了key之外,还需要指定一个要查询的时间戳,表示要查询不大于
该时间戳的最新的消息,即查询指定时间戳之前存储的最新消息。这个时间戳文件名可以简化查询,提
高查询效率。具体后面会详细讲解。
indexFile文件是何时创建的?其创建的条件(时机)有两个:
当第一条带key的消息发送来后,系统发现没有indexFile,此时会创建第一个indexFile文件
当一个indexFile中挂载的index索引单元数量超出2000w个时,会创建新的indexFile。当带key的
消息发送到来后,系统会找到最新的indexFile,并从其indexHeader的最后4字节中读取到
indexCount。若indexCount >= 2000w时,会创建新的indexFile。
由于可以推算出,一个_indexFile_的最大大小是:_(40[indexHeader] + 500w * 4 [slots]+ 2000w * 20[index])_字节
最多有五百万个slots,每个四个字节;一个slost最多可以挂在四个index,每个index最多占20个字节.
查询流程
- 当消费者通过业务key来查询相应的消息时,其需要经过一个相对较复杂的查询流程。不过,在分析查询流程之前,首先要清楚几个定位计算式子:
计算指定消息key的slot槽位序号:
slot槽位序号 = key的hash % 500w
计算槽位序号为n的slot在indexFile中的起始位置:(这是起始位置,所以是计算上一个槽位)
slot(n)位置 = 40 + (n - 1) * 4
计算indexNo为m的index在indexFile中的起始位置:
index(m)位置 = 40 + 500w * 4 + (m - 1) * 20
_40_为_indexFile_中_indexHeader_的字节数
_500w * 4 _是所有_slots_所占的字节数
消费模式
- 广播模式:该模式下,在同一个消费者组里的消费者都能消费同一个消息.
- 集群模式:该模式下,一个消息只能被同一个消费者的某个消费者消费.
消息进度保存
- 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会
消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度。
- 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic
中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是
需要共享的。下图是broker中存放的各个Topic的各个Queue的消费进度。
**Rebalance机制 **
- Rebalance机制讨论的前提是:集群消费。
简介
- Rebalance即再均衡,指的是,将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个
Consumer间进行重新分配的过程。
- Rebalance机制的本意是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费
者的情况下,这