1. MQ理解
1.1 MQ的产品种类和对比
MQ即消息中间件。MQ是一种理念,ActiveMQ是MQ的落地产品。
消息中间件产品
各类MQ对比
- Kafka
- 编程语言:Scala
- 大数据领域的主流MQ
- RabbitMQ
- 编程语言:Erlang
- 基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。
- RocketMQ
- 编程语言:Java
- 适用于大型项目,适用于集群
- ActiveMQ
- 编程语言:Java
- 适用于中小型项目
1.2 MQ产生背景
系统之间直接调用存在的问题?
微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例。这些架构会有哪些问题?
-
系统之间接口耦合比较严重
每新增一个下游功能,都要对上游的相关接口进行改造。举个例子:如果系统A要发送数据给系统B和系统C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送。当代码上线后又新增了一个需求:把数据也发送给D,新上了一个D系统也要接受A系统的数据,此时就需要修改A系统,让他感知到D系统的存在,同时把数据处理好再给D。在这个过程你会看到每接入一个下游系统都要对系统A进行代码改造,开发联调的效率很低。其整体架构如下图:
-
面对大流量并发时容易被冲垮
每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时容易被冲垮。举例秒杀业务:上游系统发起下单购买操作就是下单一个操作很快就完成。然而下游系统要完成秒杀业务后面的所有逻辑(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)。
-
等待同步存在性能问题
RPC接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。
根据上述的几个问题,在设计系统时可以明确要达到的目标:
-
要做到系统解耦,当新的模块接进来时可以做到代码改动最小,能够解耦
-
设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费不被冲垮,能削峰
-
强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力,能够异步
1.3 MQ主要作用
异步
调用者无需等待解耦
解决了系统之间耦合调用的问题消峰
抵御洪峰流量,保护了主业务
1.4 MQ的定义
面向消息的中间件(message-oriented middleware
)MOM能够很好的解决以上问题。是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。
大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候消息服务器会将消息转发给接受者。在这个过程中发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系。尤其在发布pub/订阅sub模式下,也可以完成一对多的通信即让一个消息有多个接受者。
1.5 MQ特点
采用异步处理模式
消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上。消息接收者则订阅或者监听该通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。
案例:也就是说一个系统跟另一个系统之间进行通信的时候,假如系统A希望发送一个消息给系统B让他去处理。但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ然后就不管这条消息的“死活了”,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理都是系统B的事儿与系统A无关。
应用系统之间解耦合
发送者和接受者不必了解对方,只需要确认消息。发送者和接受者不必同时在线。
整体架构
MQ缺点
两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。
1.6 CentOS7安装ActiveMQ
cd /root
mkdir active_mq
tar -xzvf apache-activemq-5.14.0-bin.tar.gz
# /etc/init.d/目录增加增加activemq文件
cd /etc/init.d/
vim activemq
#!/bin/sh
#
# /etc/init.d/activemq
# chkconfig: 345 63 37
# description: activemq servlet container.
# processname: activemq 5.14.0
# Source function library.
#. /etc/init.d/functions
# source networking configuration.
#. /etc/sysconfig/network
export JAVA_HOME=/root/java/jdk1.8.0_221
export CATALINA_HOME=/root/active_mq/apache-activemq-5.14.0
case $1 in
start)
sh $CATALINA_HOME/bin/activemq start
;;
stop)
sh $CATALINA_HOME/bin/activemq stop
;;
restart)
sh $CATALINA_HOME/bin/activemq stop
sleep 1
sh $CATALINA_HOME/bin/activemq start
;;
esac
exit 0
# 对activemq文件授予权限
chmod 777 activemq
# 设置开机启动并启动activemq
chkconfig activemq on
service activemq start
# 启动时指定日志输出文件,activemq日志默认的位置是在:%activemq安装目录%/data/activemq.log
service activemq start > /root/active_mq/activemq.log
# 访问地址:http://IP地址:8161/
# 默认账户:admin/admin
# 61616 端口提供JMS服务
# 8161 端口提供管理控制台服务
# 查看activemq状态
service activemq status
# 关闭activemq服务
service activemq stop
2. Java程序生成消息基本案例
2.1 JMS简介
JMS 总体编码规范
JMS开发基本步骤
Destination
Destination 即目的地。下面拿 jvm 和 mq 做个对比,目的地可以理解为是数据存储的地方。
两种Destination
2.2 Idea新建Maven工程
<!-- activemq 所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- junit/log4j等基础配置 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependency>
2.3 队列消息(Queue)
队列消息特点
点对点消息传递域的特点如下
- 每个消息只能有一个消费者,类似 1对1 的关系。
- 消息的生产者和消费者之间 没有时间上的相关性。无论消费者在生产者发送消息时是否处于运行状态,消费者都可以提取消息。如我们发送短信,发送者发送后接受者不一定会及收及看。
- 消息被消费后队列 不会再存储,所以消费者 不会消费到已经被消费过的消息。
队列消息生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂获得连接connection并启动访问
Connection conn = factory.createConnection();
conn.start();
//3.创建会话session
// 两个参数:事务,签收
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
// Destination -> Queue/Topic
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
//6.通过使用消息生产者发送三条消息到MQ队列中
for (int i = 0; i < 3; i++) {
//创建消息
TextMessage textMessage = session.createTextMessage("msg -> " + i);
//通过消息生产者发送给MQ
producer.send(textMessage);
}
//7.关闭资源
producer.close();
session.close();
conn.close();
System.out.println("====> 消息发布到MQ完成");
}
}
队列消息消费者 – 同步阻塞式 receive
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
// reveive():一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式,和socket的accept方法类似的。
// reveive(Long time):等待n毫秒之后还没有收到消息就结束阻塞。
// 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
TextMessage message = (TextMessage) consumer.receive(4000L);
if (null != message) {
System.out.println("====> 消费者的消息:" + message.getText());
} else {
break;
}
}
consumer.close();
session.close();
conn.close();
}
}
队列消息消费者 – 异步非阻塞监听式 MessageListener
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
// 监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消费者接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //保证控制台不停
consumer.close();
session.close();
conn.close();
}
}
消费者三种情况
- 先生产,只启动一个消费者 ① => ①消费者会消费掉全部消息
- 先生产,然后先启动消费者①,再启动消费者② => ①消费者会消费掉全部消息,②消费者不能消费消息
- 先启动消费者①和②,再生产 => ①和②轮询消费,各自消费一半消息
2.4 主题消息(Topic)
主题消息特点
在发布订阅消息传递域中,目的地被称为主题(topic)。
发布/订阅消息传递域的特点如下:
-
生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系。
-
生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费 自它订阅之后发布的消息。
-
生产者生产时,topic 不保存消息,它是 无状态的 不落地的,假如无人订阅就去生产那就是一条废消息,所以一般先启动消费者再启动生产者。
默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。
主题消息生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//只有这一步和Queue有区别
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息发布到MQ完成");
}
}
主题消息消费者
存在多个消费者,每个消费者都能收到自从自己启动后所有生产的消息。
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws Exception {
System.out.println("=====> 1号消费者");//多加几个消费者做实验
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//只有这一步和Queue有区别
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消费者接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
2.5 Topic和Queue对比
3. JMS (Java消息服务) 详解
3.1 Java消息服务是什么
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
3.2 JMS四大组成元素
Message – 消息头
JMS的消息头有哪些属性:
-
JMSDestination
:消息目的地。主要是指Queue和Topic。 -
JMSDeliveryMode
:消息持久化模式。分为持久模式和非持久模式,一条持久性的消息应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。 -
JMSExpiration
:消息过期时间。可以设置消息在一定时间后过期,默认是永不过期消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。如果发送后在消息过期时间之后还没有被发送到目的地,则该消息被清除。 -
JMSPriority
:消息的优先级。消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。 JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。 -
JMSMessageID
:消息的唯一标识符。唯一标识每个消息的标识由MQ产生,也可以自己指定但是每个消息的标识要求唯一。
说明:消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
//这里可以指定每个消息的目的地
textMessage.setJMSDestination(topic);
//消息的模式,持久模式/非持久模式
textMessage.setJMSDeliveryMode(0);
//消息的过期时间
textMessage.setJMSExpiration(1000);
//消息的优先级
textMessage.setJMSPriority(10);
//指定每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
textMessage.setJMSMessageID("ABCD");
//上面的属性也可以通过send重载方法进行设置
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息发布到MQ完成");
}
}
Message – 消息体
理解:封装具体的消息数据
五种消息格式
注意:发送和接收的消息体类型必须一致对应
消息生产者
for (int i = 0; i < 3; i++) {
// 发送TextMessage消息体
TextMessage textMessage = session.createTextMessage("topic " + i);
producer.send(textMessage);
// 发送MapMessage 消息体。set方法添加,get方式获取
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","rose" + i);
mapMessage.setInt("age", 18 + i);
producer.send(mapMessage);
}
消息消费者
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消费者接受到text消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("name"));
System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Message – 消息属性
如果需要除消息头字段之外的值那么可以使用消息属性。它是 识别 / 去重 / 重点标注 等操作非常有用的方法。
它们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
下图是设置消息属性的API:
生产者
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topic " + i);
// 调用Message的set*Property()方法就能设置消息属性
// 根据value的数据类型的不同,有相应的API
textMessage.setStringProperty("From","rose@qq.com");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
producer.send(textMessage);
}
消费者
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息体:" + textMessage.getText());
System.out.println("消息属性:" + textMessage.getStringProperty("From"));
System.out.println("消息属性:" + textMessage.getByteProperty("Spec"));
System.out.println("消息属性:" + textMessage.getBooleanProperty("Invalide"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.5 JMS的可靠性
RERSISTENT – 持久性
什么是持久化消息 => 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者,虽然这样增加了消息传送的开销但却增加了可靠性。
我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
- Queue消息非持久和持久
- Queue非持久,当服务器宕机消息不存在(消息丢失了)。
注意:只要服务器没有宕机,即便是非持久,消费者不在线的话消息也不会丢失,等待消费者在线还是能够收到消息的。
//非持久化的消费者和之前的代码一样。下面演示非持久化的生产者。
// 非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Queue持久化,当服务器宕机消息依然存在。Queue消息默认是持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
//持久化的消费者和之前的代码一样。下面演示持久化的生产者。
//持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- Topic消息非持久和持久
-
Topic非持久,Topic默认就是非持久化的,因为生产者生产消息时消费者也要在线,这样消费者才能消费到消息。
-
Topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
//持久化topic生产者代码
// 设置持久化topic
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 设置持久化topic之后再启动连接
conn.start();
//持久化topic消费者代码
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = activeMQConnectionFactory.createConnection();
// 设置客户端ID,向MQ服务器注册自己的名称
conn.setClientID("marrry");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 之后再开启连接
connection.start();
//之前是消息的消费者,这里就改为主题的订阅者
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic:" + textMessage.getText());
message = topicSubscriber.receive(2000L);//继续监听2s,从激活到离线
//经测试:离线再激活后仍然能收到之前的消息
}
session.close();
conn.close();
}
注意:
一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
然后再运行生产者发送消息。
之后无论消费者是否在线都会收到消息。如果不在线的话,下次连接的时候会把没有收过的消息都接收过来。
Transaction – 事务
生产者开启事务后,执行commit方法这批消息才真正的被提交。不执行commit方法这批消息不会提交。执行rollback方法之前的消息会回滚掉。生产者的事务机制要高于签收机制,当生产者开启事务后签收机制不再重要。
消费者开启事务后,执行commit方法这批消息才算真正的被消费。不执行commit方法这些消息不会标记已消费,下次还会被消费。执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。
注意:消费者和生产者需要同时操作事务才行吗? => 消费者和生产者的事务完全没有关联,各自是各自的事务。
- 生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
//1.创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
//设置为开启事务
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topic " + i);
producer.send(textMessage);
// if(i == 2) {
// throw new RuntimeException("=====> GG");
// }
}
// 2. 开启事务后,使用commit提交事务,这样这批消息才能真正的被提交。
session.commit();
System.out.println("====> 消息发布到MQ完成");
} catch (JMSException e) {
System.out.println("出现异常,消息回滚");
// 3. 工作中一般当代码出错我们在catch代码块中回滚。这样这批发送的消息就能回滚。
session.rollback();
} finally {
producer.close();
session.close();
conn.close();
}
}
}
//如果有一条抛出异常,则回滚
//Exception in thread "main" java.lang.RuntimeException: =====> GG
- 消费者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws Exception {
System.out.println("=====> 1号消费者");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
// 创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
// 消费者开启了事务就必须手动提交,不然会重复消费消息
final Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
int a = 0;
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息体:" + textMessage.getText());
if(a == 0){
System.out.println("commit");
session.commit();
}
if (a == 2) {
System.out.println("rollback");
session.rollback();
}
a++;
} catch (JMSException e) {
System.out.println("出现异常,消费失败,放弃消费");
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
});
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
// 不执行commit方法的1和2消息不会标记已消费,下次还会被消费
// 执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费
// =====> 1号消费者
// 消息体:topic 0
// commit
// 消息体:topic 1
// 消息体:topic 2
// rollback
// 消息体:topic 1
// 消息体:topic 2
Acknowledge – 签收
签收的几种方式
-
自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的,该种方式无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
-
手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收,该种方式需要我们手动调用Message.acknowledge()来签收消息。如果不签收消息该消息会被我们反复消费直到被签收。
-
允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全可能会重复消费。该种方式很少使用到。
-
事务下的签收(Session.SESSION_TRANSACTED):开启事务的情况下可以使用该方式,该种方式很少使用到。
事务和签收的关系
-
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚则消息会被再次传送。事务优先于签收,开始事务后签收机制不再起任何作用。
-
非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
-
生产者事务开启,只有commit后才能将全部消息变为已消费。
-
事务偏向生产者,签收偏向消费者。也就是说生产者使用事务更好点,消费者使用签收机制更好点。
非事务下的消费者如何使用手动签收的方式
- 非事务下的生产者跟之前的代码一样
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息发布到MQ完成");
}
}
- 非事务下的消费者如何手动签收
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
//这里改为Session.CLIENT_ACKNOWLEDGE
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage message = (TextMessage) consumer.receive(4000L);
if (null != message) {
System.out.println("====> 消费者的消息:" + message.getText());
//设置为Session.CLIENT_ACKNOWLEDGE后,要调用该方法,标志着该消息已被签收(消费)。
//如果不调用该方法,该消息的标志还是未消费,下次启动消费者或其他消费者还会收到改消息。
message.acknowledge();
} else {
break;
}
}
consumer.close();
session.close();
conn.close();
}
}
注意:JMS保证可靠有四种方式,除了上面讲到的持久性,事务,签收,还可以通过多节点集群的方式来保证可靠性。
3.6 JMS的点对点总结
点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能,和我们平时给朋友发送短信类似。
如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
队列可以长久地保存消息直到消费者收到消息,消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
3.7 JMS的发布订阅总结
JMS的发布订阅总结
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作Topic。
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立,不需要解除即可保证消息的传送。
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
持久订阅
客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。
当非持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。
非持久和持久化订阅如何选择
当所有的消息必须被接收则用持久化订阅,当消息丢失能够被容忍则用非持久订阅。
4. ActiveMQ的Broker
4.1 broker是什么
相当于 一个ActiveMQ服务器实例。说白了Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
4.2 启动broker时指定配置文件
启动broker时指定配置文件,可以帮助我们在一台服务器上启动多个broker。实际工作中一般一台服务器只启动一个broker。
4.3 嵌入式的broker启动
用ActiveMQ Broker作为独立的消息服务器来构建Java应用。
ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用。
下面演示如何启动嵌入式的broker
pom.xml添加一个依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
嵌入式broker的启动类
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支持在vm中通信基于嵌入的broker
BrokerService brokerService = new BrokerService();
brokerService.setPopulateJMSXUserID(true);
brokerService.addConnector("tcp://127.0.0.1:61616");
brokerService.start();
}
}
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue_01";
...
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue_01";
...
5. Spring整合ActiveMQ
理解
我们之前介绍的内容也很重要,它更灵活,支持各种自定义功能,可以满足我们工作中复杂的需求。
很多activemq的功能要看官方文档或者博客,这些功能大多是在上面代码的基础上修改完善的。如果非要把这些功能强行整合到spring,就有些缘木求鱼了。而另一种方式整合spring更好,就是将上面的类注入到Spring中,其他不变。这样既能保持原生的代码,又能集成到spring。
下面我们讲的Spring和SpringBoot整合ActiveMQ也重要,它给我们提供了一个模板,简化了代码,减少我们工作中遇到坑,能够满足开发中90%以上的功能。
**pom.xml添加依赖 **
<dependencies>
<!-- ActiveMQ 所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- ActiveMQ 和 Spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- 嵌入式ActiveMQ -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<!-- Spring对JMS的支持,整合Spring和ActiveMQ -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<!-- ActiveMQ连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- Spring核心依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- junit/log4j等基础配置 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
Spring的ActiveMQ配置文件
src/main/resources/spring-activemq.cml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
<!-- 开启包的自动扫描 -->
<context:component-scan base-package="com.polaris"/>
<!-- 配置生产者 -->
<bean id="connectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<!-- 真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供 -->
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://mpolaris.top:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!-- 这个是队列目的地,点对点的Queue -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 通过构造注入Queue名 -->
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 这个是主题目的地, 发布订阅的主题Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- Spring提供的JMS工具类,他可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 传入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 传入目的地 -->
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息自动转换器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
队列生产者
@Service
public class JmsProduce {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ioc = new ClassPathXmlApplicationContext("spring-activemq.xml");
JmsProduce produce = (JmsProduce) ioc.getBean("jmsProduce");
produce.jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("====> Spring和ActiveMQ的整合情况");
return message;
}
});
System.out.println("Send task over!");
}
}
队列消费者
@Service
public class JmsConsumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ioc = new ClassPathXmlApplicationContext("spring-activemq.xml");
JmsConsumer consumer = (JmsConsumer) ioc.getBean("jmsConsumer");
String value = (String) consumer.jmsTemplate.receiveAndConvert();
System.out.println(value);
}
}
主题生产者和消费者
只需要修改配置文件目的地即可
<!-- Spring提供的JMS工具类,他可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 传入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 传入目的地 -->
<property name="defaultDestination" ref="destinationTopic"/>
<!-- 消息自动转换器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
配置消费者的监听类
写了一个类来实现消息监听后,只需要启动生产者,消费者不需要启动就自动会监听记录!
<!-- 配置监听程序 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destinationTopic"/>
</bean>
<bean id="myMessageListener" class="com.polaris.queue.MyMessageListener">
</bean>
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
6. SpringBoot整合ActiveMQ
个人不太赞成使用这种方式SpringBoot整合ActiveMQ,因为这样做会失去原生代码的部分功能和灵活性。但是工作中这种方式做能够满足我们常见的需求,也方便和简化我们的代码,也为了适应工作中大家的习惯。
6.1 队列案例 – 生产者点击投递
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<groupId>com.polaris</groupId>
<artifactId>springboot-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
# web占用的端口
server:
port: 8085
spring:
activemq:
# activemq的broker的url
broker-url: tcp://mpolaris.top:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认)=queue true=topic
pub-sub-domain: false
# 自定义队列名称,这只是个常量
myqueue: boot-activemq-queue
ActiveMQ配置类
@Configuration
@EnableJms //开启Jms适配的注解
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
//注入目的地
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
队列消息生产者
@Component
public class QueueProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue,"===> SpringBoot + ActiveMQ消息");
}
}
测试类
@SpringBootTest(classes = Application.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Resource //这个是java 的注解,而Autowried是spring 的
private QueueProduce produce;
@Test
public void testSend() {
produce.produceMsg();
}
}
6.2 队列案例 – 生产者间隔定投
QueueProduce新增定时投递方法
/**
* 间隔3秒定时投送
*/
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(queue,"定时投送 => "
+ UUID.randomUUID().toString().substring(0,6));
}
主启动类添加一个注解
@SpringBootApplication
@EnableScheduling //允许开启定时投送功能
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
直接开启主启动类,间隔投递消息
6.3 队列案例 – 消费者监听
@Component
public class QueueCustomer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage message) throws JMSException {
System.out.println("消费者收到消息 => " + message.getText());
}
}
6.4 主题基本案例
application.yml配置文件
server:
port: 6666
spring:
activemq:
broker-url: tcp://mpolaris.top:61616
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认)=queue true=topic
pub-sub-domain: true
mytopic: boot-activemq-topic
ActiveMQ配置文件
@Configuration
@EnableJms //开启Jms适配的注解
public class ConfigBean {
@Value("${mytopic}")
private String myTopic;
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
}
主题生产者
@Component
public class TopicProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(topic,"===> SpringBoot + ActiveMQ消息");
}
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(topic,"定时投送 => "
+ UUID.randomUUID().toString().substring(0,6));
System.out.println("定时投送");
}
}
主题消费者
@Component
public class QueueCustomer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage message) throws JMSException {
System.out.println("消费者收到消息 => " + message.getText());
}
}
7. ActiveMQ传输协议
7.1 简介
ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM等。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml
中的 标签之内。
activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html
除了tcp和nio协议其他的了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。
注意:协议不同,我们的代码都会不同。
7.2 各协议理解
TCP协议
Transmission Control Protocol(TCP)是默认的,TCP的Client监听端口61616
在网络传输数据前必须要先序列化数据,消息是通过一个叫wire protocol
的来序列化成字节流。默认情况下ActiveMQ把wrie protocol
叫做 OpenWire,它的目的就是促使网络上的效率更高和数据快速交换。
TCP连接的URI形式如:tcp://HostName:port?key=value&key=value
,后面的参数是可选的。
TCP传输的的优点:
-
TCP协议传输可靠性高,稳定性强
-
高效率:字节流方式传递,效率很高
-
有效性、可用性:应用广泛,支持任何平台
关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference
NIO协议
New I/O API Protocol(NIO)。NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
适合使用NIO协议的场景:
-
可能有大量的Client去连接到Broker上,一般情况下大量的Client去连接Broker是被操作系统的线程所限制的。因此NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
-
可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
NIO连接的URI形式:nio://hostname:port?key=value&key=value
关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html
AMQP协议
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端,中间件,不同产品,不同开发语言等条件限制。
STOMP协议
STOP,Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
GitLub查看MQTT示例代码:https://github.com/fusesource/mqtt-client
7.3 NIO协议案例
ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。
NIO网络IO模型简单配置
修改配置文件activemq.xml
如果你 不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型,所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。如下所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。
<transportConnectors>
<!-- 新增NIO协议 -->
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" /></transportConnectors>
SpringBoot修改端口即可
server:
port: 6666
spring:
activemq:
broker-url: nio://mpolaris.top:61618
user: admin
password: admin
jms:
pub-sub-domain: true
mytopic: boot-activemq-topic
NIO增强
修改activemq.xml配置文件(其实只要auto+nio一条都行了)
auto: 针对所有的协议,他会识别我们是什么协议。
nio:使用NIO网络IO模型
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1893?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61624?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
</transportConnectors>
修改端口号为61608即可
server:
port: 6666
spring:
activemq:
# broker-url: tcp://mpolaris.top:61608 适配多种协议(注意有些协议代码不一样)
broker-url: nio://mpolaris.top:61608
user: admin
password: admin
jms:
pub-sub-domain: true
mytopic: boot-activemq-topic
8. ActiveMQ的消息存储和持久化
8.1 理解
此处持久化和之前持久性的区别
MQ高可用:事务、持久性、签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久性是MQ的外在表现,现在讲的的持久是是底层实现。
8.2 持久化是什么
官网文档:http://activemq.apache.org/persistence
持久化是什么?一句话就是:ActiveMQ宕机了消息不会丢失的机制。
说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有则会先把存储位置中的消息发出去。
8.3 MQ持久化机制有哪些
AMQ Message Store
基于文件的存储机制,是以前的默认机制,现在不再使用。AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段这个文件会被删除。AMQ适用于ActiveMQ5.3之前的版本。
KahaDB
基于日志文件,从ActiveMQ5.4(含)开始默认的持久化,下面我们详细介绍。
LevelDB消息存储
新兴的技术,现在有些不确定。 官方文档:http://activemq.apache.org/leveldb-store。这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引,默认配置如下:
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
JDBC消息存储
下面我们再详细介绍
JDBC Message Store with ActiveMQ Journal
下面我们再详细介绍
8.4 KahaDB消息存储
理解
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个 事务日志 和仅仅用一个 索引文件 来存储它所有的地址。KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
官网文档:http://activemq.aache.org/kahadb,官网上还有一些其他配置参数。
activemq.xml配置文件
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB存储原理
KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。
db-number.log
KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
db.data
该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。db.free
记录当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的IDdb.redo
用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。lock
文件锁,表示当前kahadb独写权限的broker。
8.4 JDBC消息存储
原理图
配置
添加mysql数据库的驱动包到ActiveMQ的lib文件夹下
在activemq.xml配置文件指定JDBC消息存储
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<!-- dataSource指定将要引用的持久化数据库的bean名称
createTablesOnStartup指定是否在启动的时候创建数据表,默认为true
注意:一般是第一次启动时设置为true,之后改为false -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
在activemq.xml配置文件的标签和 标签之间插入数据库连接池配置
注意:
① 我们需要准备一个mysql数据库,并创建一个名为activemq的数据库
② 默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。
...
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://mpolaris.top:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="maxTotal" value="200" />
<property name="poolPreparedStatements" value="true"/>
</bean>
<import resource="jetty.xml"/>
...
重启activemq会自动生成如下3张表。如果没有自动生成需要我们手动执行SQL。我个人建议要自动生成,我在操作过程中查看日志文件发现了不少问题,最终解决了这些问题后是能够自动生成的。如果不能自动生成说明你的操作有问题。表字段说明如下
- ACTIVEMQ_MSGS 消息数据表
- ACTIVEMQ_ACKS数据表
- ACTIVEMQ_LOCK数据表:表ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker 。
Queue验证和数据表变化
在点对点类型中,当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中。当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。而且点对点类型中消息一旦被Consumer消费,就从数据中删除,消费前的消息会被存放到数据库 上面的消息被消费后被MQ自动删除。
- Queue非持久化模式:不会将消息持久化到数据库
- Queue持久化模式:会将消息持久化到数据库,但是消息被消费者消费后会自动删除持久化数据。
我们使用queue持久化模式发布3条消息后,发现ACTIVEMQ_MSGS数据表多了3条数据。
启动消费者消费了所有的消息后,发现数据表的数据消失了。
Topic验证和说明
设置了持久订阅数据库里面会保存订阅者的信息
ACTIVEMQ_ACKS表中的LAST_ACKED_ID记录了CLIENT_ID最后签收的一条消息,而LAST_ACKED_ID和ACTIVEMQ_MSGS的ID字段是外键关联关系,这样就可以实现Topic的消息保存到ACTIVEMQ_MSGS表内的同时还能根据ACTIVEMQ_ACKS表中的持久订阅者查到该订阅者上次收到的最后一条消息是什么。值得注意的是Topic内的消息是不会被删除的,而Queue的消息在被删除后会在数据库中被删除,如果需要保存Queue,应该使用其他方案解决。
我们启动主题持久化,生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据,消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。这个是要注意的,持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。
总结
如果是Queue,在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息。
如果是Topic,一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。
常见坑
在配置关系型数据库作为ActiveMQ的持久化存储方案时,有许多坑。
-
数据库jar包:注意对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包
-
createTablesOnStartup属性:该属性默认为true,每次启动activemq都会自动创建表,在第一次启动后应改为false避免不必要的损失。
-
下划线:报错”java.lang.IllegalStateException: LifecycleProcessor not initialized”。确认计算机主机名名称没有下划线
8.5 JDBC Message Store with ActiveMQ Journal
理解
这种方式克服了JDBC Store的不足,JDBC每次消息过来都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高
配置(基于JDBC配置稍作修改)
activemq.xml修改
# 修改配置前
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
# 修改配置后(注释掉之前的jdbc配置使用下面的)
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="../activemq-data" />
</persistenceFactory>
8.6 总结
-
Jdbc效率低,KahaDB效率高,Jdbc+Journal效率较高。
-
持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
-
持久化机制演变的过程:从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
-
ActiveMQ消息持久化机制有:
持久化机制 | 特点 |
---|---|
AMQ | 基于日志文件 |
KahaDB | 基于日志文件,从ActiveMQ5.4开始默认使用 |
JDBC | 基于第三方数据库 |
Replicated LevelDB Store | 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。 |
9. ActiveMQ多节点集群
9.1 理解
基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
9.2 三种集群方式
-
基于shareFileSystem共享文件系统(KahaDB)
-
基于JDBC
-
基于可复制的LevelDB
9.3 ZK + Replicated LevelDB Store 案例
Replicated LevelDB Store
是什么:http://activemq.apache.org/replicated-leveldb-store
使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Maste的Slaves。如果Master宕机得到了最新更新的Slave会变成Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如给你配置了replicas=3,name法定大小是(3/2)+1 = 2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success,至于为什么是2-1,阳哥的zookeeper讲解过自行复习。有一个ode要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定mode在线以能够找到拥有最新状态的ode,这个ode才可以成为新的Master。因此,推荐运行至少3个replica nodes以防止一个node失败后服务中断。
部署规划和步骤
- 环境和版本
- 关闭防火墙并保证各个服务器能够ping通
- 具备zk集群并可以成功启动
- 集群部署规划列表
- 创建3台集群目录(就是一台电脑复制三份ActiveMQ)
- 修改管理控制台端口(就是ActiveMQ后台管理页面的访问端口)
- hostname名字映射(如果不映射只需要吧mq配置文件的hostname改成当前主机ip)
- ActiveMQ集群配置
- 配置文件里面的BrokerName要全部一致
- 持久化配置(必须)
- 修改各个节点的消息端口(真实的三台机器不用管)
- 按顺序启动3个ActiveMQ节点,到这步前提是zk集群已经成功启动运行(先启动Zk 在启动ActiveMQ)
- zk集群节点状态说明
- 3台Zk连接任意一台验证三台ActiveMQ是否注册上了Zookeeper
- 查看Master
集群可用性测试
10 ActiveMQ高级特性
10.1 引入消息中间件后如何保证其高可用
zookeeper+Replicated LevelDB
10.2 异步投递Async Sends
异步投递
http://activemq.apache.org/async-sends
对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送。
是什么
ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你 没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步发送:它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能,不过这也带来了额外的问题:就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
自我理解:此处的异步是指生产者和broker之间发送消息的异步。不是指生产者和消费者之间异步。
说明:对于一个Slow Consumer,使用同步发送消息可能出成Producer堵塞等情况,慢消费者适合使用异步发送。(这句话我认为有误)
总结:① 异步发送可以让生产者发的更快。② 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。
参考官网代码实现
异步消息如何确定发送成功?
异步发送丢失消息的场景是:生产者设置userAsyncSend=true
,使用producer.send(msg)
持续发送消息。如果消息不阻塞,生产者会认为所有send
的消息均被成功发送至MQ
。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在此,同步发送等send
不阻塞了就表示一定发送成功了,异步发送需要客户端回执并由客户端再判断一次是否发送成功。
10.3 延迟投递和定时投递
官网说明:http://activemq.apache.org/delay-and-schedule-message-delivery.html
四大属性
案例
要在activemq.xml中配置schedulerSupport属性为true
Java代码里面封装的辅助消息类型:ScheduledMessage
10.4 分发策略
10.5 ActiveMQ消息重试机制
官网说明:http://activemq.apache.org/redelivery-policy
是什么
消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
具体哪些情况会引发消息重发
-
Client用了transactions且再session中调用了rollback
-
Client用了transactions且再调用commit之前关闭或者没有commit
-
Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
请说说消息重发时间间隔和重发次数
-
间隔:1
-
次数:6
-
每秒发6次
有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。
属性说明
10.6 死信队列
官方文档:http://activemq.apache.org/redelivery-policy
是什么
异常消息规避处理的集合,主要处理失败的消息。
使用:处理失败的消息
- 一般生产环境中在使用MQ时设计两个队列:一个核心业务队列,一个死信队列
- 核心业务队列:比如下图专门用来让订单系统发送订单消息的,然后另一个死信队列就是用来处理异常情况的。
- 假如第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。
- 然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,是否请求,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。
死信队列的配置(一般采用默认)
- sharedDeadLetterStrategy
- 不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
- 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.QLQ”,可以通过”deaLetterQueue”属性来设定
<deadLetterStrategy>
<sharedDeadLetterStrategy deaLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
-
individualDeadLetterStrategy
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
属性”useQueueForTopicMessages”,此值表示是否将Topic的DeaLetter保存在Queue中,默认为true
- 自动删除过期消息
过期消息是值生产者指定的过期时间,超过这个时间的消息
- 存放非持久消息到死信队列中
10.7 消息不被重复消费,幂等性问题
之后回来完善
activemq的API文档:http://activemq.apache.org/maven/apidocs/index.html