锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

RocketMQ学习笔记

时间:2023-09-02 03:07:01 3030b4通用型加速度传感器

RocketMQ学习笔记

文章目录

前言

以下前言的几个问题选自博主割肉机的博文:

一个用消息队列 的人,不知道为啥用 MQ,这就有点尴尬

本文学习参考自B站尚硅谷的【尚硅谷】2021新版RocketMQ教程丨深度掌握MQ消息中间件

下文的所以代码工程都放在了我的gitee仓库中rocketmq-study

为什么要使用消息队列?

解耦

传统模式的缺点:

系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

img

中间件模式的优点:

将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改

异步

传统模式的缺点:

一些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式的的优点:

将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

削峰

传统模式的缺点:

并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式的的优点:

系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的

使用了消息队列会有什么缺点?

我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防
从以下两个个角度来答

  • 系统可用性降低:本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是挂了。因此,系统可用性降低
  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

但是,我们该用还是要用的。

消息队列如何选型?

特性 ActiveMQ RabbitMQ RocketMQ kafka
开发语言 java erlang java scala
单机吞吐量 万级 万级 10万级 10万级
时效性 ms级 us级 ms级 ms级以内
可用性 高(主从架构) 高(主从架构) 非常高(分布式架构) 非常高(分布式架构)
功能特性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

综合上面的材料得出以下两点:

  1. 中小型软件公司,建议选RabbitMQ
    1. 一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。
    2. 正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。
    3. 不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。
    4. 不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。
  2. 大型软件公司,根据具体使用在rocketMq和kafka之间二选一
    1. 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的
    2. 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景

如何保证消息队列是高可用的?

在第二点说过了,引入消息队列后,系统的可用性下降。

在生产中,没人使用单机模式的消息队列。

因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。

如果面试的时候,面试官问,你们的消息中间件如何保证高可用的?

你的回答只是表明自己只会订阅和发布消息,面试官就会怀疑你是不是只是自己搭着玩,压根没在生产用过。

如何保证消息不被重复消费?

这个问题其实换一种问法就是,如何保证消息队列的幂等性?

这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。

先来说一下为什么会造成重复消费?

其实无论是那种消息队列,造成重复消费原因其实都是类似的。

  • 正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。
  • 只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。

造成重复消费的原因?

  • 就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

如何解决?

这个问题针对业务场景来答分以下几点

  1. 比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  2. 再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
  3. 如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。
    1. 以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

如何保证消费的可靠性传输?

我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。

如果无法做到可靠性传输,可能给公司带来千万级别的财产损失。同样的,如果可靠性传输在使用过程中,没有考虑到,这不是给公司挖坑么,你可以拍拍屁股走了,公司损失的钱,谁承担?

还是那句话,认真对待每一个项目,不要给公司挖坑。

其实这个可靠性传输,每种MQ都要从三个角度来分析:

  • 生产者弄丢数据
  • 消息队列弄丢数据
  • 消费者弄丢数据

如何保证消息的顺序性?

其实并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。

针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。

然后只用一个消费者去消费该队列。

有的人会问:**那如果为了吞吐量,有多个消费者去消费怎么办? **

这个问题,没有固定回答的套路。

比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。

如果是这样一个业务场景,那只要重试就行。

比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。

总之,针对这个问题,我的观点是**保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。 **

简介

什么是MQ?

MQ,message queue消息队列,一种提供消息队列服务的中间件,是一套提供了消息生产、储存、消费全过程的API软件系统

消息即数据

MQ用途

削峰限流

MQ可以将系统的超量请求暂存其中,以便系统慢慢处理,避免请求丢失或系统被压垮

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一一个MQ层。

后台保证了数据库一定能正常修改运行,所以我无序同步等待执行结束,而是直接在用户请求后返回订单号,接着异步去处理数据库

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等

针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

常见的MQ协议

MOM是什么?

MOM(Message Oriented Middleware)是面向消息的中间件,使用消息传送提供者来协调消息传送操作。MOM 需要提供 API 和管理工具。客户端使用api调用,把消息发送到由提供者管理的目的地。在发送消息之后,客户端会继续执行其他工作,并且在接收方收到这个消息确认之前,提供者一直保留该消息

  • PO:面向过程,Process Oriented
  • OO:面向对象,Object Oriented
  • AO:面向切面,Aspect Oriented

JMS

java message service,java消息服务

是java平台有关MOM的技术规范,它便于消息系统中的java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

ActiveMQ是该协议的典型实现

STOMP

Stream Text Orientated Message Protocol,面向文本流的消息协议

是一种MOM设计的简单文本协议,STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

ActiveMQ是该协议的典型实现,RabbitMQ通过插件可以支持该协议

AMQP

Advanced Message Queuing Protocol,高级消息队列协议

一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种MOM设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

RabbitMQ是该协议的典型实现。

MQTT

Message Queuing Telemetry Transport,消息队列遥测传输

是IBM开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗IoT设备(物联网)间的通信。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。

RabbitMQ通过插件可以支持该协议。

以上协议,RockerMQ都不支持

基本概念

消息与主题

消息(Message)

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

主题(Topic)

topic表示一类消息的集合,每个主题包含若干个消息,每条消息只属于一个主题,是对rocketmq进行消息订阅的基本单位

一个生产者可以发送多种topic的消息,而一个消费者只对某种特定的topic感兴趣,即只可以订阅和消费一种topic的消息

标签(Tags)

消息标签,用来进一步区分某个 Topic 下的消息分类,消息队列 RocketMQ 允许消费者按照 Tag 对消息进行过滤,确保消费者最终只消费到他关注的消息类型

Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以说是二级分类,关系如图所示

队列(Queue)

储存消息的实体,一个topic中可以含有多个queue,每个queue中储存的是该topic的信息

一个topic的queue也被称为一个topic中消息的分区

一个topic的queue的消息只能被一个消费者组中的消费者进行读取消费,且一个queue的消息不允许同一个消费者组里多个消费者同时消费(后面说的广播模式推翻了这个说法)

queue:消费者 => 1:1

消费者:queue => 1:n

为了提升MQ的效率,所以将topic分为多个分区去进行读取(个人感觉有点像负载均衡)

注:1号狗消费了1号骨头,就不能和2号狗去争夺2号骨头了

如果此时只有1和2号两只狗的时候,就可以让1号消费1、2号骨头,而2号狗消费3号骨头了

就是说一个消费者组里不能互相争夺分区的消息

在学习参考其它相关资料时,还会看到一个概念:分片(Sharding)

官方是没有这个概念的,但是在网上有这种说法

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker的数量。

每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

消息标识(MessageId/Key)

  • RocketMQ中每个消息拥有唯一的MessageId, 且可以携带具有业务标识的Key,以方便对消息的查询
  • 不过需要注意的是,MessageId有两个: 在生产者send0消息时会自动生成一个MessageId (msgId), 当消息到达Broker后,Broker也会自动生成一 个Messageld(称为offsetMsgId)
  • msgId、 offsetMsgId与key都称为消息标识
    • msgId:由produce端生成,生成规则为
      • producerIp+进程Pid+MessageClientSetter类的ClassLoader的hashCode+当前时间+AutomiInteger自增器
    • offsetMsgId:由broker端生成,生成规则为
      • brokerIp+物理分区的offset(也就是queue的偏移量)
    • key:由用户指定的业务相关的唯一标识

系统架构

1.Producer生产者

消息生产者,负责生产消息

比如我将系统的日志写入MQ,就是一个消息的生成

将用户的请求写入MQ,也是一个消息的生成

Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。

生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息(我的理解是可以生产多个主题的消息,但是发送时要保持消息的主题是一致的)

2.Consumer消费者

消息消费者,负责消费消息

从MQ取出日志信息进行解析,就是消息的消费

用户下单后,从MQ中读取用户的请求并进行处理的过程,也是消息的消费过程

一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理

RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的

消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,一类消费者只能消费一种topic的消息

消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易

  • 负载均衡:将一个topic中不同的queue分配给不同的consumer,看来我前面说很像负载均衡说对了
  • 容错:一个consumer挂了,还有别的consumer可以接着消费(分布式微服务容错机制)

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

一个topic类型的消息可以被多个消费者组消费

注:

  • 一个消费者组里消费者必须订阅完全相同的topic
  • 一个消费者组只能消费一个topic的消息,不能同时消费多个topic的消息

3.Name Server注册中心

NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。

主要包括两个功能:

  • Broker管理:

    • 接受Broker集群的注册信息并且保存下来作为路由信息的基本数据
    • 提供心跳检测机制,检查Broker是否还存活。
  • 路由信息管理:

    • 每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。
    • Producer和Consumer通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。

路由注册

NameServer通常也是以集群的方式部署,不过, NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯

那各节点中的数据是如何进行数据同步的呢?

在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。

在NameServer内部维护着1个Broker列表, 用来动态存储Broker的信息。

注意:

与其他zk、euraka、nacos等注册中心不同,别的都是注册一个然后内部通信,nameserver是所有都要注册

优点:无状态的集群搭建很简单

缺点:对于broker必须明确指出nameserver地址,未指出的不会进行注册,使得扩容、维护不方便

Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。

心跳包中包含Brokerld、Broker地址(IP+port)、 Broker名称、Broker所属集群名称等等。

NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。

路由剔除

由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除

NameServer中有一个定时任务,每隔10秒就会扫描一次Broker表, 查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。

路由发现

RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由

默认客户端每30秒会拉取一次最新的路由。

Pull模型:客户端隔一段时间去拉取服务端的数据

  • 可能存在“前脚拉完,后脚就改”的情况,实时性差

Push模型:服务端被客户端订阅后,一旦发生改变,立马推送到客户端

  • 保证数据一致,实时性较好
  • 但是需要一直维护两边的长连接,占用资源

Long Pulling模型:长轮询模型,客户端隔一段时间去拉取服务端的数据,且不立刻断开连接,而是服务端保持一段时间和客户端的连接,也就是Pull模型和Push模型的整合

  • nacos的配置中心用的就是这个,监控服务端的配置信息,一旦变更,客户端立马变更
  • 实时性较好
  • 对资源占用较少

所以push模式适合Client不多,数据变化频繁,对实时性要求高的业务需求

客户端NameServer选择策略

这里的客户端指:producer和consumer

客户端在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢?

客户端首先会首先一个随机数, 然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。

如果连接失败,则会采用round robin轮询策略,逐个尝试着去连接其它节点。

即:首先采取随机策略进行选择,连接失败后再使用轮询策略

4.Broker中间人

功能介绍

Broker充当着消息中转角色,负责存储消息转发消息

Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为后费者的拉取请求作准备

Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移量offset、主题topic、队列queue等

模块构成

Remoting Module

整个Broker的实体,负责处理来自clients端的请求。这个Broker实体则由以下模块构成。

  • **Client Manager:**客户端管理器

    • 负责接收、解析客户端(Producer/Consumer)请求, 管理客户端

    • 例如,维护Consumer的Topic订阅信息

  • Store Service:存储服务

    • 提供方便简单的API接口,处理消息存储到物理硬盘消息查询*功能
  • HA Service:高可用服务

    • 提供Master Broker和Slave Broker之间的数据同步功能
  • Index Service:索引服务

    • 根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能

集群部署

为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。

不过,这里有个问题,如果某Broker节点宕机,如何保证数据不丢失呢?

其解决方案是,将每个Broker集群节点进行横向扩展,将Broker节点建为一个高可用的HA集群,解决单点问题

Broker节点集群是一个主从集群,即集群中具有MasterSlave两种角色。Master负责处理读写操作请求,Slave也可以负责读写操作请求(当master宕机后需要从slave中读写)

  • 正常情况下都是操作master,slave只是备份服务,如果master宕机,则由slave顶上

  • 一个Master可以包含多个Slave,但一个Slave只能属于一个Master。

  • Master与Slave的对应关系是通过指定相同的BrokerName、不同的BrokerId 来确定的。

  • BrokerId为0表示Master,非0表示Slave。

  • 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

5.工作流程

  1. 启动Name Server,开始监听端口,等待broker、producer和consumer的连接

  2. 启动Broker时, Broker会与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包

  3. 发送消息前,可以先创建Topic。创建Topic时需要指定该Topic要存储在哪些Broker上, 当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic

    手动创建Topic:

    ​ 集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量是相同的

    ​ broker模式:该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以是不同的

    自动创建Topic:

    ​ 默认采用的broker模式,默认给每个broker创建4个queue,可以在配置文件中更改

  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接(先随机再轮询),并从NameServer中获取路由信息,即当前发送的Topic的Queue 与Broker的地址(IP+Port) 的映射关系。然后根据算法策略从队选择-个Queue,与队列所在的Broker建 立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息

  5. Consumer跟Producer类似, 跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息, 然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是, Consumer还会向Broker发送心跳,以确保Broker的存活状态

读/写队列问题

从物理上来讲,读/写队列是同一个队列。

所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念。

一般情况下,读/写队列数量是相同的

例如,创建Topic时设置的写队列数量为8,读队列数量为4,此时系统会创建8个Queue,分别是0 1 2 3 4 5 67。

Producer会将消息写入到这8个队列,但Consumer只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的消息是不会被消费到的。

再如,创建Topic时设置的写队列数量为4,读队列数量为8,此时系统会创建8个Queue,分别是0 1 2 3 4 5 6 7。

Producer会将消息写入到0 1 2 3这4个队列,但Consumer只会消费0 1 2 3 4 5 6 7这8个队列中的消息,但是4 567中是没有消息的。此时假设Consumer Group中包含两个Consumer, Consumer1消费0 1 2 3,而Consumer2消费4567。但实际情况是,Consumer2是没有消息可消费的。

不管怎么样都会造成资源浪费等不好的情况,所以一般读写队列的数量要设计一致

单机的安装与启动

准备工作

硬件

准备一台linux,我用的vm虚拟机centos,输入ifconfig获得ens33的ip为inet 192.168.146.128

C:\Windows\System32\drivers\etc\修改host文件,将ip指向我自定义命名的centos域名

环境

  1. 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
  2. 64bit JDK 1.8+
  3. Maven 3.2.x
  4. Git
  5. 4g+ free disk for Broker server

查看我的linux有没有jdk环境echo $JAVA_HOME,答案是没有

去官网下载linux的安装包,传到虚拟机

配置环境变量,输入3个值保存后,刷新profile

vi /etc/profile

# java
export JAVA_HOME=/usr/bin/java/jdk1.8.0_291
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib

source /etc/profile

测试java环境

javac
java -version
echo $JAVA_HOME

linux防火墙开放端口

CentOS升级到7之后,发现无法使用iptables控制Linux的端口,因为Centos 7使用firewalld代替了原来的iptables。下面记录如何使用firewalld开放Linux端口:

开启端口

firewall-cmd --zone=public --add-port=9800/tcp --permanent

查询端口是否开启

firewall-cmd --query-port=9800/tcp

重启防火墙

firewall-cmd --reload

查询有哪些端口是开启的

firewall-cmd --list-port

这里方法先放着,等会如果出现开启控制台无法访问时,应该是端口号没打开,例如我rocketmq-console选择9800为端口,所以需要开放9800端口

下载RocketMQ

官网地址

我这里下载的4.9.1版本的,根据自己需求下载即可

北京外国语镜像地址

  • Source源码版本
  • Binary编译后的文件

我们下载Binary文件,zip文件,然后传到linux上

unzip rocketmq-all-4.9.1-bin-release.zip解压文件

修改启动内存

因为默认的内存太大了,2G、4G啥的,服务器的配置可能起不来,所以修改一下配置

runserver.sh

修改为256m

runbroker.sh

修改为256m

Linux下启动rocketmq

Start Name Server

 	# 启动nameserver服务
  > nohup sh bin/mqnamesrv &
  	# 查看日志是否启动成功
  > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...
  	# 查看当前java进程
  > jps
  240921 NamesrvStartup
  246169 Jps

Start Broker

  > nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
  > tail -f ~/logs/rocketmqlogs/broker.log
  The broker[%s, 172.30.30.233:10911] boot success...
  > jps
  253009 BrokerStartup
  254698 Jps
  240921 NamesrvStartup

端口必须开放9876和10911端口

Send & Receive Messages

在发送/接收消息之前,我们需要告诉客户端名称服务器的位置。RocketMQ提供了多种方法来实现这一点。为了简单起见,我们使用环境变量namersv_ADDR

以下为官方提供的一个发消息和消费消息的用例

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId=7F0000011BE34AA298B76656B91103E7, offsetMsgId=C0A8928000002A9F000000000002ECD2, messageQueue=MessageQueue [topic=TopicTest, brokerName=centos, queueId=2], queueOffset=249]
 close the connection to remote address[127.0.0.1:9876] result: true
 
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=centos, queueId=0, storeSize=192, queueOffset=145, sysFlag=0, bornTimestamp=1634734558909, bornHost=/192.168.146.128:33198, storeTimestamp=1634734558910, storeHost=/192.168.146.128:10911, msgId=C0A8928000002A9F000000000001B352, commitLogOffset=111442, bodyCRC=953417484, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{ 
        topic='TopicTest', flag=0, properties={ 
        MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1634734753329, UNIQ_KEY=7F0000011BE34AA298B76656B6BD0245, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 56, 49], transactionId='null'}]]
 

Shutdown Servers

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

rocketmq控制台

github地址

下载解压,是一个maven项目

修改配置

进入src、resource文件夹,修改properties文件

默认端口是8080,这我们当然要改动了

而且他也是相当于一个client,所以需要去获取nameserver,所以需要配置nameserver的地址

server.port=9800
rocketmq.config.namesrvAddr=localhost:9876

导入依赖

导入JAXB依赖:Java Architecture for XML Binding

允许Java开发人员将Java类映射为XML表示方式。

JAXB提供两种主要特性:

  • 将一个Java对象序列化为XML,以及反向操作,将XML解析成Java对象。
  • 换句话说,JAXB允许以XML格式存储和读取数据,而不需要程序的类结构实现特定的读取XML和保存XML的代码。
<dependency>
    <groupId>javax.xml.bindgroupId>
    <artifactId>jaxb-apiartifactId>
    <version>2.3.0version>
dependency>
<dependency>
    <groupId>com.sun.xml.bindgroupId>
    <artifactId>jaxb-implartifactId>
    <version>2.3.0version>
dependency>
<dependency>
    <groupId>com.sun.xml.bindgroupId>
    <artifactId>jaxb-coreartifactId>
    <version>2.3.0version>
dependency>
<dependency>
    <groupId>javax.activationgroupId>
    <artifactId>activationartifactId>
    <version>1.1.1version>
dependency>

打包启动

跳过测试环节,目前不满足测试条件,会报错

mvn clean package -Dmaven.test.skip=true

得到结果则打包成功

[INFO] Building jar: E:\speciality_apps\RocketMQ\rocketmq-console-1.0.0\rocketmq-externals-rocketmq-console-1.0.0\rocketmq-console\target\rocketmq-console-ng-1.0.0-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  54.114 s
[INFO] Finished at: 2021-10-20T21:29:12+08:00
[INFO] ------------------------------------------------------------------------

rocketmq-console-ng-1.0.0-sources.jar就是我们打包后的可执行jar,输入指令运行

java -jar rocketmq-console-ng-1.0.0.jar

出现运行成功和端口即可

[2021-10-20 21:41:23.209]  INFO Tomcat started on port(s): 9800 (http) 

相关文章