RabbitMQ的死信队列解读

发布于:2024-10-24 编辑:匿名 来源:网络

基本介绍了什么是死信开关。定义业务队列时,必须考虑指定死信开关。

死信交换机可以绑定到任意普通队列,然后出现在业务队列中。当发生死信时,数据将被发送到死信队列。

什么是死信队列?死信队列其实就是一个普通的队列,只不过这个队列绑定了一个死信交换器,用来存储死信。 RabbitMQ中有一个交换叫DLX,全称为Dead-Letter-Exchange。

,可以称为死信交换器。当消息在队列中成为死消息时,它将被重新发送到另一个交换器。

这个交易所是DLX。绑定到DLX的队列称为死信队列。

需要说明的是,DLX也是一个普通的交换机,和一般的交换机没有什么区别。可以在任何队列上指定,它实际上设置了某个队列的属性。

当这个队列中出现死信时,RabbitMQ会自动将消息重新发布到设定的DLX上,然后路由到另一个队列,即死信队列。当消息进入死信队列时,消息过期代码语言:javascript copy MessageProperties messageProperties=new MessageProperties();//设置这条消息的过期时间为10秒 messageProperties.setExpiration("0");队列过期代码语言:javascript Copy Map arguments =new HashMap<>(); //指定死信交换并设置arguments.put("x-dead-letter-exchange",EXCHANGE_DLX); //设置死信路由key,value为死信开关和死信队列绑定的keyarguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY); //队列的过期时间arguments.put( "x-message-ttl",0);return new Queue(QUEUE_NORMAL,true,false,false,arguments); TTL:Time to Live 的缩写,过期时间队列达到最大长度(最先排队的消息将被发送到 DLX) 代码语言:javascript copy Map Arguments = new HashMap( );//设置队列的最大长度,相反的消息会被挤出,成为坏信arguments.put("x -max-length", 5);消费者拒绝消息,从普通队列接收消息,不重新投递,但不确认消息,也不重新投递消息。

此时,消息进入死信队列。application.yml 启动手动确认代码 language: javascript copy spring:rabbitmq:listener: simple:acknowledge-mode:manual code language: javascript copy /** * 正在监听正常的队列名称,不是死信queue * 我们从普通队列接收消息,但不确认消息,也不重新投递消息。

此时消息进入死信队列* *通道消息通道(是一个连接下的消息通道,一个连接下有多个消息信息,发送/接收消息都是通过通道完成的) */ @RabbitListener( queues = {RabbitConfig.QUEUE}) public void process(Message message, Channel channel) { System.out.println("收到消息:" + message); //不确认消息,ack这个词的意思是确认 try { System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag()); //手动启用rabbitm消息消费确认模式然后编写这样的代码; Channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); void basicNack(long DeliveryTag, boolean multiple, boolean requeue) DeliveryTag:消息的数字标签 multiple:翻译成中文就是多个,如果为true则表示对于所有小于 DeliveryTag 标签的消息,不会确认 Nack。 False 表示只有带有当前的deliveryTag 标签的消息才会被Nacked。

Requeue:如果为true,表示消息被Nacked后会重新发送到队列中。如果为 false,则消息被 Nacked 后不会重发。

发送给队列消费者拒绝消息,开启手动确认模式,拒绝消息而不重新投递,则进入死信队列 代码语言:javascript copy /** * 正常监听的队列名称,不是死信队列 * 我们从普通队列接收消息,但不确认消息,也不重新投递消息。此时消息进入死信队列**通道消息通道(是连接下的消息通道,一个连接下有多个消息信息,发送/接收消息都是通过通道完成的)*/ @RabbitListener(queues) = {RabbitConfig.QUEUE}) public void process(Message message, Channel channel) { System.out.println("收到消息:" + message);尝试 { System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag()); //要开启rabbitm消息消费的手动确认模式,那么代码是这样写的; Channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); Channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); basicReject 为负数 一般在消费消息发生异常时执行投递。

可以丢弃消息或者重新排序来重新处理消息 参数 1:消费者消息的索引参数 2:处理异常消息,true 表示重新排序,false 表示丢弃 Reject 拒绝消息时,可以使用 requeue 标志来告诉 RabbitMQ 是否需要重新发送给其他消费者。如果为false,则不会重发,一般消息会被RabbitMQ丢弃。

拒绝一次只能拒绝一封邮件。如果为 true,则消息将被重新传送。

Nack 与 Reject 类似,只不过它可以一次拒绝多条消息。您还可以使用重新排队标志,它是 AMQP 规范的 RabbitMQ 扩展。

通过RejectRequeuConsumer可以看到,无论是使用Reject方法还是Nack方法,当requeue参数设置为true时,都会重新投递消息。当 requeue 参数设置为 false 时,消息将丢失。

Springboot代码实战架构编辑如上图。消息到达正常交换机exchange.nomal.a。

通过绑定到普通队列queue.noaml.a,消息将到达普通队列。如果消息死了,稍后会转发。

到与普通队列绑定的死信交换机,死信交换机会将其转发到与其绑定的死信队列queue.deal.a。项目概述 项目采用springboot架构,主要用到的依赖有: 代码语言:javascript copy org.springframework.boot spring-boot -starter-amqp org.projectlombok lombok application.yml 配置文件如下: 代码语言:javascript复制服务器:端口:spring:rabbitmq:主机:..70。

端口: 用户名: admin 密码: 56 虚拟主机: /RabbitConfigDeal 配置类: 创建队列和交换机并绑定它们 代码语言: javascript copy @Configurationpublic class RabbitConfigDeal {} create 普通交换机 代码语言: javascript copy @Bean public DirectExchange normalExchange() { return ExchangeBuilder.directExchange("exchange.normal.a").build();创建死信开关代码语言: javascript copy @Bean public DirectExchange deadExchange() { return ExchangeBuilder.directExchange("exchange.dead.a").build(); } 创建死信队列 代码语言:javascript copy @Bean public Queue deadQueue(){ return QueueBuilder.durable("queue.dead.a").build() ;创建一个普通队列,设置其绑定的死信开关,以及对应的绑定路由键为命令码语言: javascript copy @Bean public Queue normalQueue(){ Map argument =new HashMap<>( );参数.put("x-message-ttl",0); argument.put("x-dead-letter-exchange","exchange.dead.a"); argument.put("x-dead-letter-routing-key","order"); return QueueBuilder.durable("queue.normal.a") .withArguments(arguments).build();绑定普通交换机和普通队列代码语言: javascript copy @Bean public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");绑定死信交换和死信队列代码语言:javascript copy @Bean public BindingbindingDeal(DirectExchange deadExchange,Queue deadQueue){ return BindingBuilder.bind(deadQueue).to(deadExchange).with("order"); } }MessageService业务类:发送消息和接收消息 代码语言:javascript copy @Component@Slf4jpublic class MessageService { @Resource private RabbitTemplaterabbitTemplate;} 发送消息方法代码语言:javascript copy public void sendMsg(){ //添加消息属性 Message message = MessageBuilder.withBody("你好词!".getBytes(StandardCharsets.UTF_8)) .build( ); rabbitTemplate.convertAndSend("exchange.normal.a","订单",message); log.info("消息发送时间:{}", new Date());这里使用的路由key是info MessageConvert涉及到应用程序序列化对于网络传输是不可避免的。发送端按照一定的规则将消息转换为字节数组进行发送,接收端按照约定的规则对byte[]数组进行解析。

RabbitMQ的序列化是指Message的body属性,也就是我们真正需要传输的东西,RabbitMQ抽象了一个MessageConvert接口来处理消息的序列化。其实有SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等接受消息代码语言: javascript copy @RabbitListener(queues = {"queue.dead.a"}) public void receiveMsg(Message message){ byte[] body = message.getBody(); String queue = message.getMessageProperties().getConsumerQueue(); String msg=new String(body); log.info ("{}收到消息时间:{},消息为{}",queue,new Date(),msg); } Message 在消息传递过程中,实际传递的对象是 org.springframework.amqp.core .Message,主要由两部分组成: MessageProperties // 消息属性 byte[] body // 消息内容 @RabbitListener 使用@RabbitListener 注解来标记该方法,当调试队列中有消息时,就会接收并处理消息处理。

方法中的参数通过MessageConverter进行转换,如果使用自定义的MessageConverter,则需要在RabbitListenerContainerFactory实例中设置消息的content_type属性(Spring默认使用的是SimpleRabbitListenerContainerFactory)。身体数据被存储。

除了使用Message对象接收消息(包括消息属性等信息)之外,还可以直接使用对应的类型接收消息体内容,但如果方法参数类型不正确,会抛出异常: application/octet-stream:二进制字节数组存储,使用byte[]application/ x-java-serialized-object:Java对象序列化格式存储,使用Object和对应的类型(反序列化时类型应与包同名,否则会抛出类未找到异常)text/plain:文本数据类型存储,使用Stringapplication/json:JSON格式,使用Object,对应类型启动Animal class RabbitMq01Application:实现ApplicationRunner接口代码语言:javascript copy/** * @author 风清云丹*/@SpringBootApplicationpublic class RabbitMq01Application Implements ApplicationRunner { public static void main(String[] args) { SpringApplication.run(RabbitMq01Application.class ,参数); @Resource 私有 MessageService 消息服务; /** * 程序启动时就会调用该方法* @param args * @throws Exception */ @Override public void run(ApplicationArguments args) throws Exception { messageService.sendMsg();在SpringBoot中,提供了一个接口:ApplicationRunner。在这个界面中,只有一个run方法。

它的执行时机是:spring容器启动后,会立即执行该接口实现类的run方法。由于该方法是在容器启动后执行的,因此可以从spring容器中获取其他注入的bean。

启动主启动类后查看控制台:代码语言:javascript 复制09-28 10:46:17。 INFO 0 --- [main] c.e.rabbitmq01.service.MessageService:发送消息时间:Thu Sep 28 10:46:17 CST 09-28 10:46:37。

INFO 0 --- [ntContainer#0-1] c.e.rabbitmq01.service.MessageService:queue.dead.a 收到消息时间: 9 月 28 日星期四 10:46:37 CST,消息 问候语!这里我们可以看到消息是在17s发送的。 20s后,即37s时,我们收到了死信队列queue.dead.a中的消息。

?????????????????????????????我参加腾讯科技创造特训营第二期有奖征文比赛瓜分奖金池10000元和键盘手表一块。

RabbitMQ的死信队列解读

站长声明

版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

标签:

相关文章

  • 极氪表示“不会造无聊的汽车”,那么它会成为未来出行的标杆吗?

    极氪表示“不会造无聊的汽车”,那么它会成为未来出行的标杆吗?

    以纯电动出行为使命,极氪智能科技成立。 它有集团制造能力的背书,还有初创品牌自身天马行空的想象力,这让它有底气说“不做无聊的车”。 对严格来说,极氪是一个年轻的品牌,但其在新能源领域的思考并不肤浅,安聪慧亲自带队担任极氪智能科技CEO,意义非凡。 极氪的起点高,

    06-21

  • 【创业24小时】2022年1月5日

    【创业24小时】2022年1月5日

    2022年1月5日投融资昨天国内市场共发生18起投资披露事件,其中先进制造6起(中润化工、博瑞吉芯、芯谷微电子、东方能源、星辉新材料、昕原半导体)、医疗健康案例5个(顶级制药、耀明药业、好孕妈妈、赛岚制药、金医盛世)、企业服务3个(超参科技、SphereEx、联泰集群)、本

    06-18

  • 雷锋网2019年度“人工智能最佳掘金案例榜单”正式揭晓

    雷锋网2019年度“人工智能最佳掘金案例榜单”正式揭晓

    如果说过去的四年是人工智能商业化进程的上半场,那么即将结束的这一年已正式引领行业进入下半场。 。 四年前,当工业界和学术界怀揣着用AI打造全新互联网级To C市场的梦想时,四年后的今天,人工智能的主战场早已转移到远离现实的传统行业。 科技感。 三年前,当业界开始认识

    06-17

  • 爆买3700亿,年轻人沉迷彩票

    爆买3700亿,年轻人沉迷彩票

    你幻想过一夜暴富吗? 很多人肯定都想过,尤其是涉世不深的年轻人。 刚毕业找工作的时候,我常常躺在床上想象,如果余额宝突然多了两个零,我要怎么花? 我想买一台顶级的电脑,建一个游戏室,安装各种炫酷的设备。 然后继续幻想,如果还有多个0,怎么花呢? 不,我什么也想不

    06-18

  • 伟创微电子获1亿元A+轮融资,上海国房建设领投

    伟创微电子获1亿元A+轮融资,上海国房建设领投

    投资圈(ID:pedaily)据10月13日消息,伟创微电子(上海)有限公司(以下简称“伟创微电子”) “伟创微电子”)近日宣布完成A+轮融资,融资金额超亿元。 本轮融资由上海国方建设领投,老股东浦东科创、华登国际、湖山资本、正轩投资、林鑫投资持续增持。 伟创微电子成立于今

    06-17

  • 【创业24小时】2022年4月28日

    【创业24小时】2022年4月28日

    投融资昨天,国内市场共发生12起投资披露事件,其中先进制造3起(成岭微电子、欧冶半导体、华引芯)、智能硬件案例3起(桂花网络决明子、捷享灵月、施比特机器人)、医疗健康案例1个(爱瑞奇迹)、本地生活案例1个(DOC咖啡)、娱乐媒体案例1个(卢克星球)、企业服务案例1个

    06-18

  • 宸境科技获数千万美元融资,斯道资本、OPPO、广汽资本联合领投

    宸境科技获数千万美元融资,斯道资本、OPPO、广汽资本联合领投

    投资界(ID:pedaily)6月4日消息,航天智能公司宸境科技宣布获得数千万美元融资,由斯道资本领投,OPPO、广汽资本联合领投。 上汽加州创业投资基金、越秀产业基金、复星锐正资本、新旺达集团等也参与投资,老股东火山石资本持续支持投资。 本轮融资的完成体现了新老股东对宸

    06-17

  • 富士生物完成数千万元A轮融资,深圳尚扬投资

    富士生物完成数千万元A轮融资,深圳尚扬投资

    投资圈(ID:pedaily)2月1日报道称,成都富士生物科技有限公司(以下简称富士生物)与深圳尚扬资产管理有限公司(以下简称深圳尚扬)签署战略投资协议。 深圳尚扬对富士生物科技进行数千万元A轮投资。 该投资已于近期完成。 富士生物科技于今年4月成立。 公司专注于创新分子

    06-18

  • 耀明投资完成第二轮逾3300美元融资

    耀明投资完成第二轮逾3300美元融资

    2020年8月15日,专注于中国早期及成长期的私募股权投资基金耀明投资宣布完成首支基金——华禾创业投资基金(以下简称“基金”)第二期募资。 注册于开曼群岛的华禾创业投资基金已获得超过3300万美元的投资承诺,超过其最初3000万美元的融资目标。   在刚刚结束的第二轮融资

    06-18

  • 政府资金注入芯片行业:在很多环节消耗殆尽

    政府资金注入芯片行业:在很多环节消耗殆尽

    环球网讯 - 自由软件之父理查德斯托曼曾在网络上公开表示:他使用配备中文的电脑的原因CPU的一是国外公司的CPU有后门。 国内相关单位对部分国外CPU经过严格测试,确认存在功能不明的“冗余”模块。 他们还发现有超过20条未公开的指令,包括加密、解密和浮点运算。 其中,有3条

    06-06

  • 申通快递:1月快递服务业务收入25.65亿元,同比增长21.27%

    申通快递:1月快递服务业务收入25.65亿元,同比增长21.27%

    申通快递发布今年1月份业务简报。 报告显示,1月份快递服务业务收入25.65亿元,同比增长21.27%;完成业务额9.89亿元,同比增长17.30%。

    06-17

  • 万物新生活(爱回收)最新财报:营收29.6亿,非GAAP营业利润再创新高

    万物新生活(爱回收)最新财报:营收29.6亿,非GAAP营业利润再创新高

    投资界(ID:pedaily)8月23日消息,二手消费电子交易及服务平台万物新生(爱回收)集团发布第二季度业绩报告。 财报显示,今年第二季度,五五新生集团总营收29.6亿元,同比增长38.1%,超过业绩指引高端。 本季度实现非美国通用会计准则营业利润1万元(调整后,剔除员工股权激

    06-18