全网最详细RabbitMQ笔记包括如何安装环境【RabbitMQ】RabbitMQ + Spring Boot集成零基础入门(基础篇)
RabbitMQ 是一个开源的消息队列中间件,作为消息代理(Message Broker)实现,它负责在不同的应用程序、服务或组件之间传递消息。RabbitMQ 实现了 AMQP(Advanced Message Queuing Protocol)协议,支持可靠的消息传递,确保消息能够在系统中被可靠、顺序地传递,并支持异步和解耦合的通信方式。其中消息传递的基本模型可以分为点对点(Point-to-
一、初始Rabbitmq
1、什么是Rabbitmq,它的概述是什么?
RabbitMQ 是一个开源的消息队列中间件,作为消息代理(Message Broker)实现,它负责在不同的应用程序、服务或组件之间传递消息。RabbitMQ 实现了 AMQP(Advanced Message Queuing Protocol)协议,支持可靠的消息传递,确保消息能够在系统中被可靠、顺序地传递,并支持异步和解耦合的通信方式。 其中消息传递的基本模型可以分为 点对点(Point-to-Point) 和 发布/订阅(Publish/Subscribe) 两种类型。
2、RabbitMQ的应用场景
-
异步处理: RabbitMQ 非常适合需要异步处理的场景。例如,在电商网站中,订单生成后需要异步处理支付、发货等任务,生产者将消息发送到 RabbitMQ,消费者异步处理这些任务。
-
分布式系统通信解耦: RabbitMQ 可以用作分布式系统中各个服务之间的通信媒介,服务之间通过消息队列进行解耦,避免直接调用和依赖。
-
事件驱动架构(EDA): 在事件驱动架构中,RabbitMQ 可以作为事件总线,消费者根据事件类型处理不同的业务逻辑。
-
日志收集与分析: RabbitMQ 可以用于收集和转发日志信息,将日志从不同的系统组件传递到中央日志处理系统。
3、RabbitMQ主要组件
-
publisher:生产者,也就是发送消息的一方
-
consumer:消费者,也就是消费消息的一方
-
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
-
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
-
Binding(绑定):交换机和队列之间的联系,定义了交换机将消息路由到队列的规则。
-
Message(消息):在生产者和消费者之间传递的数据,通常包括消息体和一些元数据(如路由键、消息头等)。
4、RabbitMQ 的优点
- 解耦合:生产者和消费者之间不直接交互,它们通过队列进行消息交换,从而解耦应用程序的各个组件。
- 异步处理:RabbitMQ 支持异步消息传递,允许系统在接收请求时立即响应,并在后台处理任务。
- 可靠性:支持消息持久化、消息确认等功能,确保消息不会丢失。
- 扩展性:通过集群和镜像队列,RabbitMQ 可以扩展到多个节点,提供高可用性和负载均衡。
- 多协议支持:虽然 RabbitMQ 基于 AMQP,但它还可以通过插件支持其他协议,如 MQTT、STOMP 等。
5、与其他消息队列性能比较
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好,因此我们课堂上选择RabbitMQ来学习。
二、RabbitMQ环境安装初始化
RabbitMQ详情安装教程地址:教程安装地址
三、SpringAMQP+RabbitMQ实战入门(基本API)
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
Spring AMQP 的官方地址: Spring AMQP SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
1、实战入门(Java API)
导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一、WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息.
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
1)声明队列
@Configuration
public class WorkQueues{
/**
* 第1个队列
*/
@Bean
public Queue workQueues(){
return new Queue("work.queue");
}
}
2)消息发送
这次我们往队列中循环发送,模拟出一个大量消息堆积的队列。
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "work.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
3) 消息接收
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法(其中**@RabbitListener**用来监听队列):
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
消费者2 sleep了200毫秒,相当于每秒处理5个消息
4)测试
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。 最终结果如下:
消费者1和消费者2竟然每人消费了25条消息:
消费者1很快完成了自己的25条消息
消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。**导致1个消费者空闲,另一个消费者忙的不可开交。**没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
5)能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
在配置中,prefetch: 1
表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询(RabbitMQ默认是轮询)。它可以帮助保证每个消息在被消费者处理时都能得到较为均匀的分配,避免某个消费者处理速度慢而导致其他消费者空闲的情况。如果不配置的话,那么RabbitMQ采用的就是一个公平轮询的方式,将消息依次发给一个消费,等他消费完了再发下一个给另外的消费者
二、交换机类型
之前的WorkQueues模型并没有交换机,引入交换机后,消息发送和接收的模式就会有很大的变化,模型如下所示
-
Publisher:生产者,不再发送消息到队列中,而是发给交换机
-
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange选的类型。
-
Queue:消息队列,接收消息、缓存消息。不过队列一定要与交换机绑定。
-
Consumer:消费者,订阅队列。
1、Fanout交换机
Fanout,在MQ中叫广播,在广播的模式下,消息发送的流程如下图所示
其主要特点:
1) 可以有多个队列
2) 每个队列都可以绑定到Exchange(交换机)
3) 生产者发送的消息,只能发送到交换机
4) 交换机把消息发送给绑定过的所有队列
5) 订阅队列的消费者都能拿到消息
案例演示
1)声明交换机和对应的队列,并进行绑定
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("xuyuan.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
2)生产者发送消息
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "xuyuan.fanout";
// 消息
String message = "hello, xuyuan ,nihao!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
3)消费者接收消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
分析总结:
交换机的作用主要是:
- 接收对应Publisher发送的消息
- 将消息按照规则路由道与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange会将消息路由到每个绑定的队列。
2、Direct交换机
场景:在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定的,需要指定一个Routingkey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
- Exchange不在把消息发送给每一个绑定的队列,而是根据消息的RoutingKey来发送到指定的队列中去。然后消费者监听对应的队列得到消息。
案例如下:
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
3、Topic交换机
分析:
使用Direct可以根据对应的RoutingKey路由到指定的队列,但是对于多元组就比较麻烦,只能一个一个绑定对应的RoutingKey,此时Topic交换机就派上用场了,可以同时划分一个子组,一个消息可以根据一个组别的队列进行投递,就需要用到Topic交换机
Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以“ .”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:只匹配1个词
举例子:
- xuyuan.# :它能够匹配xuyuan.com 或者xuyuan.xxx.xx等等
xuyuan.* :只能匹配xuyuan.aa 或者xuyuan.xx
1)初始化
@Configuration
public class Topic {
/**
* 声明交换机
* @return Topic类型交换机
*/
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("xuyuan.topic").build();
}
/**
* 第1个队列
*/
@Bean
public Queue topicQueue1(){
return new Queue("topic.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue topicQueue1, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("xuyuan.new");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue topicQueue1, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("xuyuan.#");
}
/**
* 第2个队列
*/
@Bean
public Queue topicQueue2(){
return new Queue("topic.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue topicQueue2, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("ooyl.*");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue topicQueue2, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("ooyl.#");
}
}
2)消息发送
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "许苑2上某个人";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "xuyuan.news", message);
}
3)消息接收
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
总结
下Direct交换机与Topic交换机的差异如下:
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
三、基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
是不是简单多了。 再试试Topic模式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
四、保证消息安全可靠(进阶)
1、消息丢失的情况
消息丢失的情况主要有以下三种:
- 生产者向消息代理传递的过程中,消息丢失了
- 消息代理把消息弄丢了
- 消费者把雄安锡弄丢了
过程如图所示
因此想要保证消息的安全可靠,必须从消息丢失的情况下入手。
2、生产者的可靠性保证
2.1 生产者重连
由于以下因素,网络出现问题,导致客户端连接RabbitMQ失败,造成信息发送失败。
解决办法:设置重连机制
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 如果设置有密码和账号,请把注释去掉
# username: xuyuan
# password: 1234
connection-timeout: 1s # 连接超时时间
template:
retry:
enabled: true # 开启连接超时重试机制
initial-interval: 1000ms # 连接失败后的初始等待时间
multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
测试:将RabbitMQ 服务停止,启动程序
2.2 生产者确认
RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功。
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功。
- 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功。
- 其它情况都会返回 NACK,告知生产者消息投递失败
生产者确认机制的代码实现
配置编写
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
其中publisher-confirm-type 一共有三种模式:
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息
通过集成Spring Boot的RabbitMQ,配置rabbitTemplate的回调函数
注意:每个 RabbitTemplate 只能配置一个 ReturnCallback,等生产者发送消息成功返回 ACK后会触发此回调函数。
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置回调
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
System.out.println("收到消息的return callback, " +
"exchange = " + returnedMessage.getExchange() + ", " +
"routingKey = " + returnedMessage.getRoutingKey() + ", " +
"replyCode = " + returnedMessage.getReplyCode() + ", " +
"replyText = " + returnedMessage.getReplyText() + ", " +
"message = " + returnedMessage.getMessage());
});
}
}
测试运行RabbitMQ
@Test
void testConfirmCallback() throws InterruptedException {
CorrelationData correlationData = new CorrelationData();
correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
if (confirm.isAck()) {
// 消息发送成功
System.out.println("消息发送成功,收到ack");
} else {
// 消息发送失败
System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());
}
if (throwable != null) {
// 消息回调失败
System.err.println("消息回调失败");
}
});
rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);
// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
Thread.sleep(2000);
}
发送成功后可以看到消息发送成功的回调信息
如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果
如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果
可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag 为 0 表示消息无法路由到队列)
分析总结:
对于进行生产者的确认信息总结如下:
- 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用。
- 如果一定使用,对于返回的nack的消息,可以尝试重新投递,如果依然是失败的,就记录异常信息。
- 对于publisher-return 机制,不需要开启,路由失败,一般都是业务造成的问题。
3、消息代理(RabbitMQ)的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中用来降低消息收发的延迟,这样会导致发送问题:
- 如果RabbitMQ宕机,内存中的消息会丢失。
- 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞?
当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息。
3.1 数据持久化
RabbitMQ 实现数据持久化包括 3 个方面:
- 交换机持久化。
- 队列持久化。
- 消息持久化。
注意:
利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)
发送非持久化信息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}
发送持久化信息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}
4、 消费者的可靠性
4.1 消费者确认机制
为了确认消费是否成功处理消息,RabbitMQ提供了消费者确认机制。消费者消费消息后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息。
SpringAMQP ACK 处理方式,有三种方式
SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
- none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
- auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
- 如果是业务异常,会自动返回 unack,此时消息会重新入队,重新发送
- 如果是消息处理或校验异常,自动返回 reject
测试实例如下 (只进行auto测试)
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
throw new MessageConversionException("故意抛出异常");
}
在控制台可以看到,消息被拒绝了,而且消息也没有重新发送,此时去RabbitMQ控制台查看,可以发现消息已经从队列中移除。
4.2 失败重试机制
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入无限循环,给 RabbitMQ 带来不必要的压力
我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队
在 application.yml 配置文件中开启失败重试机制
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消息消费失败重试机制
initial-interval: 1000ms # 消息消费失败后的初始等待时间
multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需
4.3 重试耗尽,失败消息的处理策略
开启重试模式后,如果重试次数耗尽后,消息依然处理失败,就需要由 MessageRecoverer 接口来处理,MessageRecoverer 有对应的三个实现类:
- RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,Spring Boot AMQP默认策略。
- ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机**(死信队列)**。
案例:( RepublishMessageRecoverer 类情况)
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct", true, false);
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true, false, false);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
}
// 定义失败重试策略
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
发送与接收消息
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
throw new RuntimeException("故意抛出异常");
}
在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列
4.4 业务幂等性
消息幂等性:保证每条消息不会被同一个消费者重复消费。
4.4.1 方案一:为每条消息设置一个唯一的 id
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
- 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
- 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理。
4.4.2 方案二:结合业务判断
5、延迟消息
5.1 什么是延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务
5.2 死信队列
5.2.1概述
死信队列(Dead Letter Queue, DLQ)是一种消息队列系统中的特殊队列,用于存储那些无法正常处理的消息。这些消息被称为“死信”(Dead Letters),因为它们在正常的处理流程中遇到了问题,无法被成功消费。DLQ的主要目的是防止这些有问题的消息阻塞或干扰系统的正常运行,并提供一个地方来分析和处理这些问题消息。
5.2.2 成为死信队列条件
一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false。
- 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费。
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
利用死信交换机的特点,可以实现发送延迟消息的功能。
自定义死信队列不推荐,故而不再展示实例
5.3 延迟消息插件(推荐使用)
5.3.1 下载并安装延迟插件
RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中
插件的下载地址:rabbitmq-delayed-message-exchange
5.3.3 在 Java 代码中发送延迟消息
声明延迟交换机
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}
声明队列和延迟交换机,并将队列和延迟交换机绑定在一起
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),
key = "delay"
))
public void listenDelayQueue(String message) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}
编写测试方法,测试发送延迟消息
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000); // 毫秒
return message;
}
});
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}
5.4延迟消息应用的场景
取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源。

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)