ActiveMQ详细使用(含高级篇)
时间:2023-03-04 14:30:00
ActiveMQ
文章目录
- ActiveMQ
- 1、概述
- 2、使用
-
- 安装
- 启动&停止
- 3、Java编码实现ActiveMQ通讯
-
- 环境搭建
- 点对点信息传输域-队列(Queue)
-
- 消息生产者
- 消息消费者
-
- 方法1:阻塞消费者(receive)
- 方法二:异步监听消费者(监听器)onMessage())
- 发布订阅信息传输域-主题(topic)
-
- 发布主题制作人
- 主题消费者订阅
-
- 方法1:阻塞消费者(receive)
- 方法二:异步监听消费者(监听器)onMessage())
- 总结
- 4、JMS产品和着陆产品
-
- 简介
- JMS组成结构和特点
-
- JMS Provider
- JMS Producer
- JMS Consumer
- JSM Message(*)
-
- 消息头
- 消息属性
- 消息体
- JMS的可靠性
-
- 持久性(Persistent)
-
- 持久性-队列(Queue)
- 持久性-主题(Topic)
- 事务(Transaction)
-
- 非事务
- 事务
- 签收(Acknowledge)
-
- 非事务使用签收
- 签收用于事务
- 5、Spring整合ActiveMQ
-
- 环境搭建
- 队列(Queue)
- 主题(Topic)
- 消费者使用监听收到消息
- 6、SpringBoot整合ActiveMQ
-
- 环境配置
- 队列(Queue)
-
- 添加配置文件
- 配置JMS
- 生产者
- 消费者
- 主题发布订阅(Topic)
-
- 添加配置文件
- 配置JMS
- 生产者
- 消费者
- 7、ActiveMQ的传输协议
-
- 协议
- 配置NIO
- 配置 NIO增强
- 8、ActiveMQ新闻存储和持久化
-
- KahaDB消息存储(默认)
- JDBC存储消息
-
- 步骤
- 总结
- JDBC Message store with ActiveMQ Journal
- 9、ActiveMQ多节点集群
-
- zookeeper replicated-leveldb-store主从集群部署(待更新)
1、概述
- 官网:http://activemq.apache.org/
- 定义:发送者向消息服务器发送消息,将消息存储在多个队列/主题中topic在适当的时候,消息服务器会将消息转发给接受者。在此过程中,发送和接收是异步的,即发送不需要等待,发送和接收的生命周期之间没有必然的关系
- 特点:解耦、削峰、异步
2、使用
安装
- 下载到官网后,可以用来解压
- 修改activemq的conf目录中jetty.xml文件(访问外部设备)
启动&停止
-
普通启动:进入到activemq的bin目录中
- ./activemq start
- ./activemq start
3、Java编码实现ActiveMQ通讯
- 目的地Destination:队列(Queue)和主题(Topic)
环境搭建
-
第一步:创建Maven工程
-
第二步:引入配置(pom.xml)
<dependency> <groupId>org.apache.activemqgroupId> <artifactId>activemq-allartifactId> <version>5.15.11version> dependency> <dependency> <groupId>org.apache.xbeangroupId> <artifactId>xbean-springartifactId> <version>4.15version> dependency>
点对点的消息传递域——队列(Queue)
- 特点
- 1、每个消息只能有一个消费者,类似于1对1的关系。
- 2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看
- 3、消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息
消息生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616"; //ActiveMQ地址
public static final String QUEUE_NAME = "queue001"; //队列名称
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列(queue)还是主题(top))
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6、创建消息并赋值
TextMessage textMessage = session.createTextMessage(); //String类型的
textMessage.setText("fzk");
//7、发送消息到队列或主题
messageProducer.send(textMessage);
messageProducer.send(mapMessage);
//8、关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送完毕");
}
}
-
控制台
-
控制台说明
- Name=队列名称
- Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数
- Number Of Consumers=消费者数量,消费者端的消费者数量。
- Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
- Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
消息消费者
方式一:阻塞式消费者(receive)
- 订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616"; //ActiveMQ地址
public static final String QUEUE_NAME = "queue001"; //队列名称
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列(queue)还是主题(top))
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的生产者
MessageConsumer messageConsumer = session.createConsumer(queue);
//阻塞式
while(true){
//6、接收队列或主题的消息
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if(textMessage != null){
//7、获取消息中的数据
String text = textMessage.getText();
System.out.println(text);
}else{
break;
}
}
//8、关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
方式二:异步监听式消费者(监听器onMessage())
- 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616"; public static final String QUEUE_NAME = "queue001"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消费者1"); //1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列(queue)还是主题(top)) Queue queue = session.createQueue(QUEUE_NAME); //5、创建消息的生产者 MessageConsumer messageConsumer = session.createConsumer(queue); //6、使用异步监听式来获取消息 messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if(message != null && message instanceof TextMessage){ //7、获取消息并通过消息获得数据 TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); //输入
键盘任意键结束 //8、关闭资源 messageConsumer.close(); session.close(); connection.close(); } }
发布订阅消息传递域——主题(topic)
-
特点
- 1、生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系
- 2、生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息
- 3、生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者
-
先启动订阅者再启动生产者,不然发送的消息是废消息
发布主题生产者
public class JmsProduceTopic {
public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列(queue)还是主题(top))
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6、创建消息并赋值
TextMessage textMessage = session.createTextMessage();
textMessage.setText("fzk");
//7、发送消息到队列或主题
messageProducer.send(textMessage);
//8、关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送完毕");
}
}
订阅主题消费者
方式一:阻塞式消费者(receive)
public class JmsConsumerTopic {
public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列(queue)还是主题(top))
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的生产者
MessageConsumer messageConsumer = session.createConsumer(queue);
//阻塞式
while(true){
//6、接收队列或主题的消息
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if(textMessage != null){
//7、获取消息中的数据
String text = textMessage.getText();
System.out.println(text);
}else{
break;
}
}
//8、关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
方式二:异步监听式消费者(监听器onMessage())
public class JmsConsumerTopic {
public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("消费者1");
//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列(queue)还是主题(top))
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建消息的生产者
MessageConsumer messageConsumer = session.createConsumer(topic);
//6、使用异步监听式来获取消息
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if(message != null && message instanceof TextMessage){
//7、获取消息并通过消息获得数据
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //输入键盘任意键结束
//8、关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
总结
比较项目 | Topic队列模式 | Queue队列模式 |
---|---|---|
工作模式 | "订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 | "负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会被丢弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |
4、JMS规范和落地产品
简介
- Java Message Service(Java消息服务是JavaEE中的一个技术)
- Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果
JMS的组成结构和特点
JMS Provider
- 实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
JMS Producer
- 消息生产者,创建和发送JMS消息的客户端应用
JMS Consumer
- 消息消费者,接收和处理JMS消息的客户端应用
JSM Message(*)
消息头
- JMSDestination:消息发送的目的地,主要是指队列Queue和主题Topic
- JMSDeliveryMode:持久模式和非持久模式
- 一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递
- 一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失
- JMSExpiration:过期时间(默认是永不过期)
- 消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值
- 如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期
- 如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除
- JMSPriority:消息优先级
- 从0-9十个级别,0-4是普通消息5-9是加急消息
- JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级
- JMSMessageID:唯一标识每个消息的标识由MQ产生
消息属性
-
发送和接收的消息类型必须一致对应
-
消息格式
-
TxtMessage : 普通字符串消息,包含一个String
TextMessage textMessage = session.createTextMessage(); //生产者 TextMessage textMessage = (TextMessage) message; //消费者
-
MapMessage :一个Map类型的消息,key为Strng类型,而值为Java基本类型
MapMessage mapMessage = session.createMapMessage(); //生产者 MapMessage mapMessage = (MapMessage) message; //消费者
-
BytesMessage : 二进制数组消息,包含一个byte[]
BytesMessage bytesMessage = session.createBytesMessage(); //生产者 BytesMessage bytesMessage = (BytesMessage) message; //消费者
-
StreamMessage : Java数据流消息,用标准流操作来顺序填充和读取
StreamMessage streamMessage = session.createStreamMessage(); //生产者 StreamMessage streamMessage = (StreamMessage) message; //消费者
-
ObjectMessage :对象消息,包含一个可序列化的Java对象
ObjectMessage objectMessage = session.createObjectMessage(); //生产者 ObjectMessage objectMessage = (ObjectMessage) message; //消费者
-
消息体
-
如果需要除消息字段以外的值,那么可以使用消息属性
-
识别/去重/重点标注等操作非常有用的方法
-
发送和接收的消息体类型必须一致对应
//生产者中消息体的使用 TextMessage textMessage = session.createTextMessage(); //String类型的 textMessage.setText("fzk"); textMessage.setStringProperty("name", "property"); //消息体 //消费者中消息体的获取(发送和接收的消息体必须一一对应) TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); System.out.println(textMessage元器件数据手册
、IC替代型号,打造电子元器件IC百科大全!