六、死信队列
创始人
2024-05-30 01:21:42
0

1、死信的概念

2、死信来源

3、死信实战

3.1 代码架构

在这里插入图片描述

  • 正常队列绑定正常交换机
  • 正常队列绑定死信交换机
  • 死信队列绑定死信

3.2 消息TTL过期变成死信

生产者向 normal_exchange发送消息,通过路由键zhangsan路由到 normal-queue中,消息设置TTL属性

/*** @author houChen* @date 2022/11/12 20:09* @Description: 死信队列实战: ttl* 生产者* 设置消息具有过期时间属性,当消息过期后会经过死信交换机路由到死信队列*/
public class Producer {private static final String NORMAL_EXECAGE = "normal_exchange";public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//1、创建一个交换机channel.exchangeDeclare(NORMAL_EXECAGE, "direct");//设置消息的TTL时间AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXECAGE, "zhangsan", basicProperties, message.getBytes());System.out.println("生产者发送消息:" + message);}}}
}

消费者c1不启动,模拟消息在normal-queue中逗留超过10s,导致消息过期,经过dead_exchange路由到dead-queue

/*** @author houChen* @date 2022/11/12 20:33* @Description: 消费者c1代码*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//正常队列绑定死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);Map params = new HashMap<>();//key是固定的params.put("x-dead-letter-exchange", DEAD_EXCHANGE);params.put("x-dead-letter-routing-key", "lisi");String normalQueueName = "normal-queue";channel.queueDeclare(normalQueueName, false, false, false, params);channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "utf-8");System.out.println("Consumer01接收到消息:" + message);};channel.basicConsume(normalQueueName, false, deliverCallback, consumerTag -> {});}
}

消费者c2消费死信队列中的消息

/*** @author houChen* @date 2022/11/12 21:11* @Description: Consumer02 会消费死信队列的消息*/
public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列,绑定String deadQueueName = "dead-queue";channel.queueDeclare(deadQueueName, false, false, false, null);channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");System.out.println("等待死信接收消息");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "utf-8");System.out.println("Consumer02接收死信队列消息:" + message);};channel.basicConsume(deadQueueName, true, deliverCallback, consumerTag -> {});}
}

启动生产者和消费者c2,发现经过10s后,消费者c2消费到生产者生产的消息,表明normal-queue中的消息过期后,确实经由dead-exchange 路由到dead-queue
在这里插入图片描述

3.3 队列达到最大长度

当 队列达到最大长度后,再往队列中投递消息时,消息会变成死信

1) 消息生产者代码去掉 TTL 属性:

/*** @author houChen* @date 2022/11/12 20:09* @Description: 死信队列实战: 队列达到最大长度导致消息进入死信队列* 生产者*  去掉消息的过期属性*/
public class Producer {private static final String NORMAL_EXECAGE = "normal_exchange";public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//1、创建一个交换机channel.exchangeDeclare(NORMAL_EXECAGE, "direct");for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXECAGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}}
}

2) C1 消费者给 normal-queue 添加最大长度的属性 (启动之后关闭该消费者 模拟其接收不到消息)
【注意】 此时需要在RabbitMQ的控制台将 normal-queue删除,不然创建队列会报错

/*** @author houChen* @date 2022/11/12 20:33* @Description: 消费者c1代码*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//正常队列绑定死信交换机Map params = new HashMap<>();//key是固定的//设置死信交换机params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信路由键params.put("x-dead-letter-routing-key", "lisi");//设置正常队列的最大长度params.put("x-max-length", 6);//声明正常队列绑定正常交换机String normalQueueName = "normal-queue";channel.queueDeclare(normalQueueName, false, false, false, params);channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "utf-8");System.out.println("Consumer01接收到消息:" + message);};channel.basicConsume(normalQueueName, true, deliverCallback, consumerTag -> {});}
}

3)测试结果
消费者生产 10 条消息后,有 6 条消息进入dead-queue
在这里插入图片描述

3.4 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

1) 消息生产者代码同上生产者一致

2)C1 消费者代码 : 对某个消息进行拒绝

/*** @author houChen* @date 2022/11/12 20:33* @Description: 消费者c1代码** 对 某个消息拒绝签收,并且不重新入队*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//正常队列绑定死信交换机Map params = new HashMap<>();//key是固定的params.put("x-dead-letter-exchange", DEAD_EXCHANGE);params.put("x-dead-letter-routing-key", "lisi");//声明正常队列,绑定, 已经String normalQueueName = "normal-queue";channel.queueDeclare(normalQueueName, false, false, false, params);channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "utf-8");if(message.equals("info5")) {System.out.println("Consumer01接收到消息:" + message + ",并拒绝签收该消息");//param1: 消息的标记   param2: 被拒绝的消息是否重新入队channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);} else {System.out.println("Consumer01接收到消息:" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};channel.basicConsume(normalQueueName, false, deliverCallback, consumerTag -> {});}
}

3)C2 消费者代码不变
启动消费者 1 然后再启动消费者 2

4)结果
消费者c1,会拒绝消费消息info5,并且拒绝消息重新入队
在这里插入图片描述
被拒绝消息会路由到 dead-queue
在这里插入图片描述

相关内容

热门资讯

电视安卓系统哪个品牌好,哪家品... 你有没有想过,家里的电视是不是该升级换代了呢?现在市面上电视品牌琳琅满目,各种操作系统也是让人眼花缭...
安卓会员管理系统怎么用,提升服... 你有没有想过,手机里那些你爱不释手的APP,背后其实有个强大的会员管理系统在默默支持呢?没错,就是那...
安卓系统软件使用技巧,解锁软件... 你有没有发现,用安卓手机的时候,总有一些小技巧能让你玩得更溜?别小看了这些小细节,它们可是能让你的手...
安卓系统提示音替换 你知道吗?手机里那个时不时响起的提示音,有时候真的能让人心情大好,有时候又让人抓狂不已。今天,就让我...
安卓开机不了系统更新 手机突然开不了机,系统更新还卡在那里,这可真是让人头疼的问题啊!你是不是也遇到了这种情况?别急,今天...
安卓系统中微信视频,安卓系统下... 你有没有发现,现在用手机聊天,视频通话简直成了标配!尤其是咱们安卓系统的小伙伴们,微信视频功能更是用...
安卓系统是服务器,服务器端的智... 你知道吗?在科技的世界里,安卓系统可是个超级明星呢!它不仅仅是个手机操作系统,竟然还能成为服务器的得...
pc电脑安卓系统下载软件,轻松... 你有没有想过,你的PC电脑上安装了安卓系统,是不是瞬间觉得世界都大不一样了呢?没错,就是那种“一机在...
电影院购票系统安卓,便捷观影新... 你有没有想过,在繁忙的生活中,一部好电影就像是一剂强心针,能瞬间让你放松心情?而我今天要和你分享的,...
安卓系统可以写程序? 你有没有想过,安卓系统竟然也能写程序呢?没错,你没听错!这个我们日常使用的智能手机操作系统,竟然有着...
安卓系统架构书籍推荐,权威书籍... 你有没有想过,想要深入了解安卓系统架构,却不知道从何下手?别急,今天我就要给你推荐几本超级实用的书籍...
安卓系统看到的炸弹,技术解析与... 安卓系统看到的炸弹——揭秘手机中的隐形威胁在数字化时代,智能手机已经成为我们生活中不可或缺的一部分。...
鸿蒙系统有安卓文件,畅享多平台... 你知道吗?最近在科技圈里,有个大新闻可是闹得沸沸扬扬的,那就是鸿蒙系统竟然有了安卓文件!是不是觉得有...
宝马安卓车机系统切换,驾驭未来... 你有没有发现,现在的汽车越来越智能了?尤其是那些豪华品牌,比如宝马,它们的内饰里那个大屏幕,简直就像...
p30退回安卓系统 你有没有听说最近P30的用户们都在忙活一件大事?没错,就是他们的手机要退回安卓系统啦!这可不是一个简...
oppoa57安卓原生系统,原... 你有没有发现,最近OPPO A57这款手机在安卓原生系统上的表现真是让人眼前一亮呢?今天,就让我带你...
安卓系统输入法联想,安卓系统输... 你有没有发现,手机上的输入法真的是个神奇的小助手呢?尤其是安卓系统的输入法,简直就是智能生活的点睛之...
怎么进入安卓刷机系统,安卓刷机... 亲爱的手机控们,你是否曾对安卓手机的刷机系统充满好奇?想要解锁手机潜能,体验全新的系统魅力?别急,今...
安卓系统程序有病毒 你知道吗?在这个数字化时代,手机已经成了我们生活中不可或缺的好伙伴。但是,你知道吗?即使是安卓系统,...
奥迪中控安卓系统下载,畅享智能... 你有没有发现,现在汽车的中控系统越来越智能了?尤其是奥迪这种豪华品牌,他们的中控系统简直就是科技与艺...