19. RabbitMQ - 死信队列
大约 3 分钟
1. 引言
本文代码已上传至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringBoot-RabbitMQ-Demo.git
死信队列听上去像 消息“死”了 ,其实也有点这个意思,我们也可以称他为“备胎队列”。
死信队列是当消息在一个队列因为以下原因产生的:
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了。
可以看下流程图:
2. 项目整合
注意:死信交换机和配置,只在初始化的时候配置,如果之前配置过相同的交换机,需要先删除。如下:删除fanoutExchange
。
1.生产者配置
@Component
public class DeadFanOutConfig {
/**
* 定义死信队列相关信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
// 邮件队列
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
// 短信队列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
// fanout 交换机
private String EXCHANGE_NAME = "fanoutExchange";
// 1.定义邮件队列
@Bean
public Queue fanOutEamilQueue() {
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
return queue;
}
// 2.定义短信队列
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
// 2.定义交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
// 3.队列与交换机绑定邮件队列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.队列与交换机绑定短信队列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
/**
* 配置死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
}
2.消费者配置
@Component
public class FanoutEamilConsumer {
/**
* 死信队列演示
*/
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
Integer timestamp = jsonObject.getInteger("timestamp");
try {
int result = 1 / timestamp;
System.out.println("result:" + result);
// 通知mq服务器删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
// // 丢弃该消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
3.死信消费者配置:
@Component
public class DeadConsumer {
@RabbitListener(queues = "dead_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("死信邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
4.启动生产者,浏览器访问: http://localhost:8081/sendFanout?queueName=fanout_email_queue,可以看到:
5.启动消费者
6.浏览器访问: http://localhost:8081/sendFanout?queueName=fanout_email_queue ,可以看到消费者先自动补偿,补偿结束后,还是失败,然后拒绝消息,发送给死信队列了。
自动补偿:
失败放到死信队列:
最终由死信队列来获取消息: