RocketMQ学习笔记
时间:2023-09-02 03:07:01
RocketMQ学习笔记
文章目录
- RocketMQ学习笔记
-
- 前言
-
- 为什么要使用消息队列?
-
- 解耦
- 异步
- 削峰
- 使用消息队列有什么缺点?
- 如何选择消息队列?
- 如何保证消息队列的高可用性?
- 如何保证消息不被重复消费?
- 如何保证消费的可靠性传输?
- 如何保证消息的顺序?
- 简介
-
- 什么是MQ?
- MQ用途
-
- 削峰限流
- 异步解耦
- 数据收集
- 常见的MQ协议
- 基本概念
- 系统架构
-
- 1.Producer生产者
- 2.Consumer消费者
- 3.Name Server注册中心
-
- 路由注册
- 路由剔除
- 路由发现
- 客户端NameServer选择策略
- 4.Broker中间人
-
- 功能介绍
- 模块构成
- 集群部署
- 5.工作流程
-
- 阅读/写队列问题
- 安装和启动单机
-
- 准备工作
-
- 硬件
- 环境
- linux防火墙开口
- 下载RocketMQ
- 修改启动内存
- Linux下启动rocketmq
-
- Start Name Server
- Start Broker
- Send & Receive Messages
- Shutdown Servers
- rocketmq控制台
-
- 修改配置
- 导入依赖
- 打包启动
- 测试访问message
- 集群的搭建
-
- 数据复制和刷盘策略
-
- 复制策略
- 刷盘策略
- Broker集群模式
-
- 单master
- 多master
- 多master多slave模式-异步复制
- 多master多slave模式-同步双写
- 最佳方案
- 集群建设实践
-
- 配置
- 启动
-
- 启动nameserver
- 启动两个master
- 启动两个slave
- 修改控制台配置
- mqadmin命令
- RocketMQ工作原理
-
- 消息的生产
-
- 制作新闻的过程
- Queue选择算法
- 消息的储存
- indexFile
- 消息的消费
-
- 推拉获取消费类型
- 消费模式
- Rebalance机制
- Queue分配算法
- 原则至少一次
- 订阅关系的一致性
-
- 正确订阅关系
- 订阅关系错误
- offset管理
-
- offset本地管理模式
- offset远程管理模式
- offset用途
- 重试队列
- 集群模式下offset同步提交和异步提交
- 消费幂等
-
- 什么是消费幂等等?
- 重复新闻的场景分析
- 一般解决方案
- 实现消费幂等
- 消息积累和消费延迟
-
- 概念
- 消息拉取
- 消费耗时
- 消费并发度
- 消息的清理
- RocketMQ应用
-
- 一、普通消息
-
- 同步发送消息
- 异步发送消息
- 单向发送消息
- 消费者消费代码示例
- 二、顺序消息
-
- 什么是顺序消息?
- 为什么需要顺序消息?
- 新闻的有序分类
- 三、延迟消息
-
- 什么是延时消息?
- 延时等级
- 延时消息原理
- 原理总结
- 延时消费代码示例
- 四、事务消息
-
- 问题引入
- 解决思路
- 基础
- XA三剑客
- 注意事项
- 事务消息代码示例
- 五、批量消息
-
- 批量发送
- 批量消费
- 六、消息过滤
-
- Tag过滤
- SQL过滤
- 代码示例
- 七、消息发送重试机制
-
- 什么是消息发送重试机制
- 同步发送失败策略
- 异步发送失败策略
- 消息刷盘发送失败策略
- 八、消息消费重试机制
-
- 顺序消息的消费重试
- 无序消息的消费重试
- 消费重试次数与间隔
- 重试队列
- 消费重试的配置方式
- 消费不重试配置文件
- 九、死信队列
-
- 什么是死信队列
- 死信队列的特征
- 死信队列的处理
前言
以下前言的几个问题选自博主割肉机的博文:
一个用消息队列 的人,不知道为啥用 MQ,这就有点尴尬
本文学习参考自B站尚硅谷的【尚硅谷】2021新版RocketMQ教程丨深度掌握MQ消息中间件
下文的所以代码工程都放在了我的gitee仓库中rocketmq-study
为什么要使用消息队列?
解耦
传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改
异步
传统模式的缺点:
一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式的的优点:
将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰
传统模式的缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的
使用了消息队列会有什么缺点?
我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防
从以下两个个角度来答
- 系统可用性降低:本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是挂了。因此,系统可用性降低
- 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。
但是,我们该用还是要用的。
消息队列如何选型?
特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms级 | us级 | ms级 | ms级以内 |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广 |
综合上面的材料得出以下两点:
- 中小型软件公司,建议选RabbitMQ
- 一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。
- 正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。
- 不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。
- 不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。
- 大型软件公司,根据具体使用在rocketMq和kafka之间二选一
- 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的
- 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景
如何保证消息队列是高可用的?
在第二点说过了,引入消息队列后,系统的可用性下降。
在生产中,没人使用单机模式的消息队列。
因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。
如果面试的时候,面试官问,你们的消息中间件如何保证高可用的?
你的回答只是表明自己只会订阅和发布消息,面试官就会怀疑你是不是只是自己搭着玩,压根没在生产用过。
如何保证消息不被重复消费?
这个问题其实换一种问法就是,如何保证消息队列的幂等性?
这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。
先来说一下为什么会造成重复消费?
其实无论是那种消息队列,造成重复消费原因其实都是类似的。
- 正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。
- 只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
那造成重复消费的原因?
- 就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?
这个问题针对业务场景来答分以下几点
- 比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
- 如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。
- 以redis为例,给消息分配一个全局id,只要消费过该消息,将
以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
- 以redis为例,给消息分配一个全局id,只要消费过该消息,将
如何保证消费的可靠性传输?
我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。
如果无法做到可靠性传输,可能给公司带来千万级别的财产损失。同样的,如果可靠性传输在使用过程中,没有考虑到,这不是给公司挖坑么,你可以拍拍屁股走了,公司损失的钱,谁承担?
还是那句话,认真对待每一个项目,不要给公司挖坑。
其实这个可靠性传输,每种MQ都要从三个角度来分析:
- 生产者弄丢数据
- 消息队列弄丢数据
- 消费者弄丢数据
如何保证消息的顺序性?
其实并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。
针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。
然后只用一个消费者去消费该队列。
有的人会问:**那如果为了吞吐量,有多个消费者去消费怎么办? **
这个问题,没有固定回答的套路。
比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。
如果是这样一个业务场景,那只要重试就行。
比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。
总之,针对这个问题,我的观点是**保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。 **
简介
什么是MQ?
MQ,message queue消息队列,一种提供消息队列服务的中间件,是一套提供了消息生产、储存、消费全过程的API软件系统
消息即数据
MQ用途
削峰限流
MQ可以将系统的超量请求暂存其中,以便系统慢慢处理,避免请求丢失或系统被压垮
异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一一个MQ层。
后台保证了数据库一定能正常修改运行,所以我无序同步等待执行结束,而是直接在用户请求后返回订单号,接着异步去处理数据库
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等
针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。
常见的MQ协议
MOM是什么?
MOM(Message Oriented Middleware)是面向消息的中间件,使用消息传送提供者来协调消息传送操作。MOM 需要提供 API 和管理工具。客户端使用api调用,把消息发送到由提供者管理的目的地。在发送消息之后,客户端会继续执行其他工作,并且在接收方收到这个消息确认之前,提供者一直保留该消息
- PO:面向过程,Process Oriented
- OO:面向对象,Object Oriented
- AO:面向切面,Aspect Oriented
JMS
java message service,java消息服务
是java平台有关MOM的技术规范,它便于消息系统中的java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。
ActiveMQ是该协议的典型实现
STOMP
Stream Text Orientated Message Protocol,面向文本流的消息协议
是一种MOM设计的简单文本协议,STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
ActiveMQ是该协议的典型实现,RabbitMQ通过插件可以支持该协议
AMQP
Advanced Message Queuing Protocol,高级消息队列协议
一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种MOM设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
RabbitMQ是该协议的典型实现。
MQTT
Message Queuing Telemetry Transport,消息队列遥测传输
是IBM开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗IoT设备(物联网)间的通信。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。
RabbitMQ通过插件可以支持该协议。
以上协议,RockerMQ都不支持
基本概念
消息与主题
消息(Message)
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
主题(Topic)
topic表示一类消息的集合,每个主题包含若干个消息,每条消息只属于一个主题,是对rocketmq进行消息订阅的基本单位
一个生产者可以发送多种topic的消息,而一个消费者只对某种特定的topic感兴趣,即只可以订阅和消费一种topic的消息
标签(Tags)
消息标签,用来进一步区分某个 Topic 下的消息分类,消息队列 RocketMQ 允许消费者按照 Tag 对消息进行过滤,确保消费者最终只消费到他关注的消息类型
Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以说是二级分类,关系如图所示
队列(Queue)
储存消息的实体,一个topic中可以含有多个queue,每个queue中储存的是该topic的信息
一个topic的queue也被称为一个topic中消息的分区
一个topic的queue的消息只能被一个消费者组中的消费者进行读取消费,且一个queue的消息不允许同一个消费者组里多个消费者同时消费(后面说的广播模式推翻了这个说法)
queue:消费者 => 1:1
消费者:queue => 1:n
为了提升MQ的效率,所以将topic分为多个分区去进行读取(个人感觉有点像负载均衡)
注:1号狗消费了1号骨头,就不能和2号狗去争夺2号骨头了
如果此时只有1和2号两只狗的时候,就可以让1号消费1、2号骨头,而2号狗消费3号骨头了
就是说一个消费者组里不能互相争夺分区的消息
在学习参考其它相关资料时,还会看到一个概念:分片(Sharding)
官方是没有这个概念的,但是在网上有这种说法
分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker的数量。
每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。
消息标识(MessageId/Key)
- RocketMQ中每个消息拥有唯一的MessageId, 且可以携带具有业务标识的Key,以方便对消息的查询
- 不过需要注意的是,MessageId有两个: 在生产者send0消息时会自动生成一个MessageId (msgId), 当消息到达Broker后,Broker也会自动生成一 个Messageld(称为offsetMsgId)
- msgId、 offsetMsgId与key都称为消息标识
- msgId:由produce端生成,生成规则为
- producerIp+进程Pid+MessageClientSetter类的ClassLoader的hashCode+当前时间+AutomiInteger自增器
- offsetMsgId:由broker端生成,生成规则为
- brokerIp+物理分区的offset(也就是queue的偏移量)
- key:由用户指定的业务相关的唯一标识
- msgId:由produce端生成,生成规则为
系统架构
1.Producer生产者
消息生产者,负责生产消息
比如我将系统的日志写入MQ,就是一个消息的生成
将用户的请求写入MQ,也是一个消息的生成
Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。
生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息(我的理解是可以生产多个主题的消息,但是发送时要保持消息的主题是一致的)
2.Consumer消费者
消息消费者,负责消费消息
从MQ取出日志信息进行解析,就是消息的消费
用户下单后,从MQ中读取用户的请求并进行处理的过程,也是消息的消费过程
一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的
消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,一类消费者只能消费一种topic的消息
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易
- 负载均衡:将一个topic中不同的queue分配给不同的consumer,看来我前面说很像负载均衡说对了
- 容错:一个consumer挂了,还有别的consumer可以接着消费(分布式微服务容错机制)
消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。
一个topic类型的消息可以被多个消费者组消费
注:
- 一个消费者组里消费者必须订阅完全相同的topic
- 一个消费者组只能消费一个topic的消息,不能同时消费多个topic的消息
3.Name Server注册中心
NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
主要包括两个功能:
-
Broker管理:
- 接受Broker集群的注册信息并且保存下来作为路由信息的基本数据
- 提供心跳检测机制,检查Broker是否还存活。
-
路由信息管理:
- 每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。
- Producer和Consumer通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
路由注册
NameServer通常也是以集群的方式部署,不过, NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯
那各节点中的数据是如何进行数据同步的呢?
在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。
在NameServer内部维护着1个Broker列表, 用来动态存储Broker的信息。
注意:
与其他zk、euraka、nacos等注册中心不同,别的都是注册一个然后内部通信,nameserver是所有都要注册
优点:无状态的集群搭建很简单
缺点:对于broker必须明确指出nameserver地址,未指出的不会进行注册,使得扩容、维护不方便
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。
心跳包中包含Brokerld、Broker地址(IP+port)、 Broker名称、Broker所属集群名称等等。
NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。
路由剔除
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除
NameServer中有一个定时任务,每隔10秒就会扫描一次Broker表, 查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。
路由发现
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由
默认客户端每30秒会拉取一次最新的路由。
Pull模型:客户端隔一段时间去拉取服务端的数据
- 可能存在“前脚拉完,后脚就改”的情况,实时性差
Push模型:服务端被客户端订阅后,一旦发生改变,立马推送到客户端
- 保证数据一致,实时性较好
- 但是需要一直维护两边的长连接,占用资源
Long Pulling模型:长轮询模型,客户端隔一段时间去拉取服务端的数据,且不立刻断开连接,而是服务端保持一段时间和客户端的连接,也就是Pull模型和Push模型的整合
- nacos的配置中心用的就是这个,监控服务端的配置信息,一旦变更,客户端立马变更
- 实时性较好
- 对资源占用较少
所以push模式适合Client不多,数据变化频繁,对实时性要求高的业务需求
客户端NameServer选择策略
这里的客户端指:producer和consumer
客户端在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢?
客户端首先会首先一个随机数, 然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。
如果连接失败,则会采用round robin轮询策略,逐个尝试着去连接其它节点。
即:首先采取随机策略进行选择,连接失败后再使用轮询策略
4.Broker中间人
功能介绍
Broker充当着消息中转角色,负责存储消息、转发消息
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为后费者的拉取请求作准备
Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移量offset、主题topic、队列queue等
模块构成
Remoting Module:
整个Broker的实体,负责处理来自clients端的请求。这个Broker实体则由以下模块构成。
-
**Client Manager:**客户端管理器
-
负责接收、解析客户端(Producer/Consumer)请求, 管理客户端
-
例如,维护Consumer的Topic订阅信息
-
-
Store Service:存储服务
- 提供方便简单的API接口,处理消息存储到物理硬盘和消息查询*功能
-
HA Service:高可用服务
- 提供Master Broker和Slave Broker之间的数据同步功能
-
Index Service:索引服务
- 根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能
集群部署
为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。
不过,这里有个问题,如果某Broker节点宕机,如何保证数据不丢失呢?
其解决方案是,将每个Broker集群节点进行横向扩展,将Broker节点建为一个高可用的HA集群,解决单点问题
Broker节点集群是一个主从集群,即集群中具有Master与Slave两种角色。Master负责处理读写操作请求,Slave也可以负责读写操作请求(当master宕机后需要从slave中读写)
-
正常情况下都是操作master,slave只是备份服务,如果master宕机,则由slave顶上
-
一个Master可以包含多个Slave,但一个Slave只能属于一个Master。
-
Master与Slave的对应关系是通过指定相同的BrokerName、不同的BrokerId 来确定的。
-
BrokerId为0表示Master,非0表示Slave。
-
每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
5.工作流程
-
启动Name Server,开始监听端口,等待broker、producer和consumer的连接
-
启动Broker时, Broker会与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包
-
发送消息前,可以先创建Topic。创建Topic时需要指定该Topic要存储在哪些Broker上, 当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic
手动创建Topic:
集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量是相同的
broker模式:该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以是不同的
自动创建Topic:
默认采用的broker模式,默认给每个broker创建4个queue,可以在配置文件中更改
-
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接(先随机再轮询),并从NameServer中获取路由信息,即当前发送的Topic的Queue 与Broker的地址(IP+Port) 的映射关系。然后根据算法策略从队选择-个Queue,与队列所在的Broker建 立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息
-
Consumer跟Producer类似, 跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息, 然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是, Consumer还会向Broker发送心跳,以确保Broker的存活状态
读/写队列问题
从物理上来讲,读/写队列是同一个队列。
所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念。
一般情况下,读/写队列数量是相同的
例如,创建Topic时设置的写队列数量为8,读队列数量为4,此时系统会创建8个Queue,分别是0 1 2 3 4 5 67。
Producer会将消息写入到这8个队列,但Consumer只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的消息是不会被消费到的。
再如,创建Topic时设置的写队列数量为4,读队列数量为8,此时系统会创建8个Queue,分别是0 1 2 3 4 5 6 7。
Producer会将消息写入到0 1 2 3这4个队列,但Consumer只会消费0 1 2 3 4 5 6 7这8个队列中的消息,但是4 567中是没有消息的。此时假设Consumer Group中包含两个Consumer, Consumer1消费0 1 2 3,而Consumer2消费4567。但实际情况是,Consumer2是没有消息可消费的。
不管怎么样都会造成资源浪费等不好的情况,所以一般读写队列的数量要设计一致
单机的安装与启动
准备工作
硬件
准备一台linux,我用的vm虚拟机centos,输入ifconfig
获得ens33
的ip为inet 192.168.146.128
在C:\Windows\System32\drivers\etc\
修改host文件,将ip指向我自定义命名的centos域名
环境
- 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
- 64bit JDK 1.8+
- Maven 3.2.x
- Git
- 4g+ free disk for Broker server
查看我的linux有没有jdk环境echo $JAVA_HOME
,答案是没有
去官网下载linux的安装包,传到虚拟机
配置环境变量,输入3个值保存后,刷新profile
vi /etc/profile
# java
export JAVA_HOME=/usr/bin/java/jdk1.8.0_291
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib
source /etc/profile
测试java环境
javac
java -version
echo $JAVA_HOME
linux防火墙开放端口
CentOS升级到7之后,发现无法使用iptables控制Linux的端口,因为Centos 7使用firewalld代替了原来的iptables。下面记录如何使用firewalld开放Linux端口:
开启端口
firewall-cmd --zone=public --add-port=9800/tcp --permanent
查询端口是否开启
firewall-cmd --query-port=9800/tcp
重启防火墙
firewall-cmd --reload
查询有哪些端口是开启的
firewall-cmd --list-port
这里方法先放着,等会如果出现开启控制台无法访问时,应该是端口号没打开,例如我rocketmq-console选择9800为端口,所以需要开放9800端口
下载RocketMQ
官网地址
我这里下载的4.9.1版本的,根据自己需求下载即可
北京外国语镜像地址
- Source源码版本
- Binary编译后的文件
我们下载Binary文件,zip文件,然后传到linux上
unzip rocketmq-all-4.9.1-bin-release.zip
解压文件
修改启动内存
因为默认的内存太大了,2G、4G啥的,服务器的配置可能起不来,所以修改一下配置
runserver.sh
修改为256m
runbroker.sh
修改为256m
Linux下启动rocketmq
Start Name Server
# 启动nameserver服务
> nohup sh bin/mqnamesrv &
# 查看日志是否启动成功
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
# 查看当前java进程
> jps
240921 NamesrvStartup
246169 Jps
Start Broker
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
> jps
253009 BrokerStartup
254698 Jps
240921 NamesrvStartup
端口必须开放9876和10911端口
Send & Receive Messages
在发送/接收消息之前,我们需要告诉客户端名称服务器的位置。RocketMQ提供了多种方法来实现这一点。为了简单起见,我们使用环境变量namersv_ADDR
以下为官方提供的一个发消息和消费消息的用例
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=7F0000011BE34AA298B76656B91103E7, offsetMsgId=C0A8928000002A9F000000000002ECD2, messageQueue=MessageQueue [topic=TopicTest, brokerName=centos, queueId=2], queueOffset=249]
close the connection to remote address[127.0.0.1:9876] result: true
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=centos, queueId=0, storeSize=192, queueOffset=145, sysFlag=0, bornTimestamp=1634734558909, bornHost=/192.168.146.128:33198, storeTimestamp=1634734558910, storeHost=/192.168.146.128:10911, msgId=C0A8928000002A9F000000000001B352, commitLogOffset=111442, bodyCRC=953417484, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{
topic='TopicTest', flag=0, properties={
MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1634734753329, UNIQ_KEY=7F0000011BE34AA298B76656B6BD0245, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 56, 49], transactionId='null'}]]
Shutdown Servers
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
rocketmq控制台
github地址
下载解压,是一个maven项目
修改配置
进入src、resource文件夹,修改properties文件
默认端口是8080,这我们当然要改动了
而且他也是相当于一个client,所以需要去获取nameserver,所以需要配置nameserver的地址
server.port=9800
rocketmq.config.namesrvAddr=localhost:9876
导入依赖
导入JAXB依赖:Java Architecture for XML Binding
允许Java开发人员将Java类映射为XML表示方式。
JAXB提供两种主要特性:
- 将一个Java对象序列化为XML,以及反向操作,将XML解析成Java对象。
- 换句话说,JAXB允许以XML格式存储和读取数据,而不需要程序的类结构实现特定的读取XML和保存XML的代码。
<dependency>
<groupId>javax.xml.bindgroupId>
<artifactId>jaxb-apiartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>com.sun.xml.bindgroupId>
<artifactId>jaxb-implartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>com.sun.xml.bindgroupId>
<artifactId>jaxb-coreartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>javax.activationgroupId>
<artifactId>activationartifactId>
<version>1.1.1version>
dependency>
打包启动
跳过测试环节,目前不满足测试条件,会报错
mvn clean package -Dmaven.test.skip=true
得到结果则打包成功
[INFO] Building jar: E:\speciality_apps\RocketMQ\rocketmq-console-1.0.0\rocketmq-externals-rocketmq-console-1.0.0\rocketmq-console\target\rocketmq-console-ng-1.0.0-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 54.114 s
[INFO] Finished at: 2021-10-20T21:29:12+08:00
[INFO] ------------------------------------------------------------------------
rocketmq-console-ng-1.0.0-sources.jar
就是我们打包后的可执行jar,输入指令运行
java -jar rocketmq-console-ng-1.0.0.jar
出现运行成功和端口即可
[2021-10-20 21:41:23.209] INFO Tomcat started on port(s): 9800 (http)