新规实施:申请手机号码将全面实行人脸比对
06-18
基本介绍了什么是死信开关。定义业务队列时,必须考虑指定死信开关。
死信交换机可以绑定到任意普通队列,然后出现在业务队列中。当发生死信时,数据将被发送到死信队列。
什么是死信队列?死信队列其实就是一个普通的队列,只不过这个队列绑定了一个死信交换器,用来存储死信。 RabbitMQ中有一个交换叫DLX,全称为Dead-Letter-Exchange。
,可以称为死信交换器。当消息在队列中成为死消息时,它将被重新发送到另一个交换器。
这个交易所是DLX。绑定到DLX的队列称为死信队列。
需要说明的是,DLX也是一个普通的交换机,和一般的交换机没有什么区别。可以在任何队列上指定,它实际上设置了某个队列的属性。
当这个队列中出现死信时,RabbitMQ会自动将消息重新发布到设定的DLX上,然后路由到另一个队列,即死信队列。当消息进入死信队列时,消息过期代码语言:javascript copy MessageProperties messageProperties=new MessageProperties();//设置这条消息的过期时间为10秒 messageProperties.setExpiration("0");队列过期代码语言:javascript Copy Map
此时,消息进入死信队列。application.yml 启动手动确认代码 language: javascript copy spring:rabbitmq:listener: simple:acknowledge-mode:manual code language: javascript copy /** * 正在监听正常的队列名称,不是死信queue * 我们从普通队列接收消息,但不确认消息,也不重新投递消息。
此时消息进入死信队列* *通道消息通道(是一个连接下的消息通道,一个连接下有多个消息信息,发送/接收消息都是通过通道完成的) */ @RabbitListener( queues = {RabbitConfig.QUEUE}) public void process(Message message, Channel channel) { System.out.println("收到消息:" + message); //不确认消息,ack这个词的意思是确认 try { System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag()); //手动启用rabbitm消息消费确认模式然后编写这样的代码; Channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); void basicNack(long DeliveryTag, boolean multiple, boolean requeue) DeliveryTag:消息的数字标签 multiple:翻译成中文就是多个,如果为true则表示对于所有小于 DeliveryTag 标签的消息,不会确认 Nack。 False 表示只有带有当前的deliveryTag 标签的消息才会被Nacked。
Requeue:如果为true,表示消息被Nacked后会重新发送到队列中。如果为 false,则消息被 Nacked 后不会重发。
发送给队列消费者拒绝消息,开启手动确认模式,拒绝消息而不重新投递,则进入死信队列 代码语言:javascript copy /** * 正常监听的队列名称,不是死信队列 * 我们从普通队列接收消息,但不确认消息,也不重新投递消息。此时消息进入死信队列**通道消息通道(是连接下的消息通道,一个连接下有多个消息信息,发送/接收消息都是通过通道完成的)*/ @RabbitListener(queues) = {RabbitConfig.QUEUE}) public void process(Message message, Channel channel) { System.out.println("收到消息:" + message);尝试 { System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag()); //要开启rabbitm消息消费的手动确认模式,那么代码是这样写的; Channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); Channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); basicReject 为负数 一般在消费消息发生异常时执行投递。
可以丢弃消息或者重新排序来重新处理消息 参数 1:消费者消息的索引参数 2:处理异常消息,true 表示重新排序,false 表示丢弃 Reject 拒绝消息时,可以使用 requeue 标志来告诉 RabbitMQ 是否需要重新发送给其他消费者。如果为false,则不会重发,一般消息会被RabbitMQ丢弃。
拒绝一次只能拒绝一封邮件。如果为 true,则消息将被重新传送。
Nack 与 Reject 类似,只不过它可以一次拒绝多条消息。您还可以使用重新排队标志,它是 AMQP 规范的 RabbitMQ 扩展。
通过RejectRequeuConsumer可以看到,无论是使用Reject方法还是Nack方法,当requeue参数设置为true时,都会重新投递消息。当 requeue 参数设置为 false 时,消息将丢失。
Springboot代码实战架构编辑如上图。消息到达正常交换机exchange.nomal.a。
通过绑定到普通队列queue.noaml.a,消息将到达普通队列。如果消息死了,稍后会转发。
到与普通队列绑定的死信交换机,死信交换机会将其转发到与其绑定的死信队列queue.deal.a。项目概述 项目采用springboot架构,主要用到的依赖有: 代码语言:javascript copy
端口: 用户名: admin 密码: 56 虚拟主机: /RabbitConfigDeal 配置类: 创建队列和交换机并绑定它们 代码语言: javascript copy @Configurationpublic class RabbitConfigDeal {} create 普通交换机 代码语言: javascript copy @Bean public DirectExchange normalExchange() { return ExchangeBuilder.directExchange("exchange.normal.a").build();创建死信开关代码语言: javascript copy @Bean public DirectExchange deadExchange() { return ExchangeBuilder.directExchange("exchange.dead.a").build(); } 创建死信队列 代码语言:javascript copy @Bean public Queue deadQueue(){ return QueueBuilder.durable("queue.dead.a").build() ;创建一个普通队列,设置其绑定的死信开关,以及对应的绑定路由键为命令码语言: javascript copy @Bean public Queue normalQueue(){ Map
RabbitMQ的序列化是指Message的body属性,也就是我们真正需要传输的东西,RabbitMQ抽象了一个MessageConvert接口来处理消息的序列化。其实有SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等接受消息代码语言: javascript copy @RabbitListener(queues = {"queue.dead.a"}) public void receiveMsg(Message message){ byte[] body = message.getBody(); String queue = message.getMessageProperties().getConsumerQueue(); String msg=new String(body); log.info ("{}收到消息时间:{},消息为{}",queue,new Date(),msg); } Message 在消息传递过程中,实际传递的对象是 org.springframework.amqp.core .Message,主要由两部分组成: MessageProperties // 消息属性 byte[] body // 消息内容 @RabbitListener 使用@RabbitListener 注解来标记该方法,当调试队列中有消息时,就会接收并处理消息处理。
方法中的参数通过MessageConverter进行转换,如果使用自定义的MessageConverter,则需要在RabbitListenerContainerFactory实例中设置消息的content_type属性(Spring默认使用的是SimpleRabbitListenerContainerFactory)。身体数据被存储。
除了使用Message对象接收消息(包括消息属性等信息)之外,还可以直接使用对应的类型接收消息体内容,但如果方法参数类型不正确,会抛出异常: application/octet-stream:二进制字节数组存储,使用byte[]application/ x-java-serialized-object:Java对象序列化格式存储,使用Object和对应的类型(反序列化时类型应与包同名,否则会抛出类未找到异常)text/plain:文本数据类型存储,使用Stringapplication/json:JSON格式,使用Object,对应类型启动Animal class RabbitMq01Application:实现ApplicationRunner接口代码语言:javascript copy/** * @author 风清云丹*/@SpringBootApplicationpublic class RabbitMq01Application Implements ApplicationRunner { public static void main(String[] args) { SpringApplication.run(RabbitMq01Application.class ,参数); @Resource 私有 MessageService 消息服务; /** * 程序启动时就会调用该方法* @param args * @throws Exception */ @Override public void run(ApplicationArguments args) throws Exception { messageService.sendMsg();在SpringBoot中,提供了一个接口:ApplicationRunner。在这个界面中,只有一个run方法。
它的执行时机是:spring容器启动后,会立即执行该接口实现类的run方法。由于该方法是在容器启动后执行的,因此可以从spring容器中获取其他注入的bean。
启动主启动类后查看控制台:代码语言:javascript 复制09-28 10:46:17。 INFO 0 --- [main] c.e.rabbitmq01.service.MessageService:发送消息时间:Thu Sep 28 10:46:17 CST 09-28 10:46:37。
INFO 0 --- [ntContainer#0-1] c.e.rabbitmq01.service.MessageService:queue.dead.a 收到消息时间: 9 月 28 日星期四 10:46:37 CST,消息 问候语!这里我们可以看到消息是在17s发送的。 20s后,即37s时,我们收到了死信队列queue.dead.a中的消息。
?????????????????????????????我参加腾讯科技创造特训营第二期有奖征文比赛瓜分奖金池10000元和键盘手表一块。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-18
06-17
06-17
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用