京都鲜酿获千汇资本战略种子轮投资
06-17
基本介绍消息的可靠传递就是要保证消息传递过程中的每一个环节都是成功的,那么这肯定会牺牲一些性能,性能和可靠性不能兼得;如果在业务实时一致性要求不是特别高的场景下,可以牺牲一些可靠性来换取性能。 ?Edit ① 代表生产者发送到 Exchange 的消息; ②表示消息从Exchange到Queue的路由; ③ 代表队列中存储的消息; ④代表消费者监听Queue并消费消息; rabbitmq的整个消息传递路径是:生产者—>rabbitmqbroker->exchange->queue->consumer消息会从生产者到exchange返回一个confirmCallback。
如果exchange->queue消息传递失败,会返回一个returnCallback。Producer->exchange:确保消息发送到RabbitMQ服务器的exchange。
消息将从生产者返回到交易所。 Confirm模式消息的confirmCallback确认机制是指生产者投递消息后,到达消息服务器Broker中的交换交换机。
将向制作人做出答复。 Producer收到响应后,用于判断消息是否正常发送到Broker的exchange。
这也是消息可靠传递的重要保证;?编辑???????具体代码设置配置文件application.yml开启确认模式:spring.rabbitmq.publisher-confirm-type=corlated。编写一个类来实现实现RabbitTemplate.ConfirmCallback来确定成功和失败的ack结果。
你可以根据具体的结果。如果 ack 为 false,则重新发送或记录消息;设置rabbitTemplate的确认回调方法rabbitTemplate.setConfirmCallback(messageConfirmCallBack);代码语言:javascript copy @Component public class MessageConfirmCallBack Implements RabbitTemplate.ConfirmCallback { /** * 交换机收到消息后,会 Callback 这个方法 * * @param correlationData 关联数据 * @param ack 有两个值,true 和 false, true 表示成功:消息正确到达交换机,否则 false 表示消息没有正确到达交换机 * @param Cause message 没有正确到达交换机的原因是什么*/ @Override public void recognize(CorrelationData correlationData, boolean ack , 字符串原因) { System.out.println("correlationData = " + correlationData); System.out.println("ack = " + ack); System.out.println("原因 = " + 原因); if (ack) { //正常} else { //异常,可能需要记录或重新发送} } } 消息参考代码(需要初始化RabbitTemplate) 代码语言:javascript copy @Servicepublic class MessageService { @Resource private RabbitTemplaterabbitTemplate; @Resource private MessageConfirmCallBack messageConfirmCallBack; @PostConstruct //当bean初始化时,该方法会被调用一次。
它只被调用一次,起到初始化的作用。公共无效初始化(){rabbitTemplate.setConfirmCallback(messageConfirmCallBack); } /** * 发送消息*/ public void sendMessage () { //关联数据对象 CorrelationData correlationData = new CorrelationData(); correlationData.setId("O3"); //比如设置一个订单ID,那么在confirm回调中,就可以知道哪一个订单还没有发送到Switch上rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE+,“信息”,“你好”,相关数据); System.out.println("消息已发送..."); }}事务(transaction)模式 RabbitMQ 支持事务(transaction),在 RabbitMQ 中支持事务机制相关的方法有 3 个:txSelect()、txCommit() 和 txRollback() (1) txSelect 用于设置当前通道进入事务模式,并通过调用 tx.select 方法启用事务模式。
(2) txCommit用于提交事务。当启用事务模式时,只有在所有镜像队列都保存了一条消息后,RabbitMQ才会调用tx.commit-ok并返回给客户端。
(3) txRollback 用于回滚事务。通过txSelect打开交易后,我们可以将消息发布到broker代理服务器上。
如果txCommit提交成功,则消息一定已经到达broker。如果在执行txCommit之前broker异常崩溃或者由于其他原因抛出异常。
这时,我们可以通过txRo??llback捕获异常并回滚事务。代码语言:javascript 复制channel.txSelect(); // 设置当前通道为事务模式/** * ConfirmConfig.exchangeName(交换机名称) * ConfirmConfig.routingKey(路由key) * message(消息内容) */channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties. PERSISTENT_TEXT_PLAIN,消息));channel.txCommit(); // 提交事务channel.txRollback(); // 回滚事务 注意:事务确实可以解析生产者和broker之间的消息关于确认,只有当消息被broker成功接受时,事务提交才能成功。
否则,我们可以捕获异常并在重传消息的同时执行事务回滚操作。事务机制的缺点:使用事务机制会降低RabbitMQ的性能。
它会造成生产者和RabbitMq之间的同步(等待确认),这也违背了我们使用RabbitMq的初衷,所以很少使用。交换->队列:确保消息从交换发送到队列。
② 可能因路由关键字不正确、队列不存在或队列名称不正确而失败。使用返回模式,可以在消息无法路由时将消息返回给生产者;当然,在实际生产环境中,我们不会出现这个问题,上线前我们会进行严格的测试(这种问题很少见);消息来自exchange –>queue 如果投递失败,会返回一个returnCallback。
返回模式开启确认模式;使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange时,会回调confirm方法。
在方法中判断ack。如果为真,则发送成功。
如果为false,则发送失败,需要处理。注意,在配置文件中,启用返回模式; spring.rabbitmq.publisher-returns: true 使用rabbitTemplate.setReturnCallback设置返回函数。
,当消息从exchange路由到queue失败时,会将消息返回给生产者,并执行回调函数returnedMessage;代码语言: javascript copy @Componentpublic class MessageReturnCallBack Implements RabbitTemplate.ReturnsCallback { /** * 当消息没有从交换机正确返回到队列时,会触发该方法 * 如果消息从交换机正确到达队列,那么这个方法就不会被触发 * * @param returned */ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println(" 消息返回方式:" + returned);代码语言:javascript copy @Servicepublic class MessageService { @Resource private RabbitTemplaterabbitTemplate; @Resource private MessageReturnCallBack messageReturnCallBack; @PostConstruct //当bean初始化的时候,这个方法会被调用一次,只调用一次,起到初始化的作用 public void init() {rabbitTemplate.setReturnsCallback(messageReturnCallBack); } /** * 发送消息 */ public void sendMessage() {rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "信息", "你好"); System.out.println("消息已发送..."); }} 备份交换机(alternate-exchange) 使用备份交换机(alternate -exchange),不可路由的消息将被发送到该备份交换机。备份交换机可以理解为RabbitMQ中交换机的“备胎”。
当我们为某个交换机声明对应的备份交换机时,我们就为它创建了一个备胎。 ,当交换机收到不可路由的报文时,会将报文转发给备份交换机,由备份交换机进行转发和处理。
通常备份交换机的类型是Fanout,这样所有的消息都可以投递到它所绑定的队列中,然后我们在备份交换机下面绑定一个队列,这样所有原交换机无法路由的消息都会进入这个队列。到达这个队列后,消费者就可以处理它,并通知开发者检查。
当消息经过交换机准备路由到队列时,却发现没有对应的队列来传递信息。在rabbitmq中,消息默认会被丢弃。
如果我们想监控哪些消息被投递到没有对应队列的队列中,我们可以使用备份交换机。实现上可以接收来自备用交换机的消息,然后记录日志或发送警报信息。
设置参考代码代码语言:javascript copy Map
建造();切换持久化代码语言:javascript copy ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();消息持久化代码语言:javascript copy MessageProperties messageProperties = new MessageProperties();//设置消息持久化,当然默认是持久化的,所以不需要设置。可以查看源码 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);集群、镜像队列、高可用 即使 RabbitMQ 节点崩溃,也允许消费者和生产者继续运行 如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的故障将导致整体服务暂时不可用,并可能也会导致消息丢失。
所有的消息都可以设置为持久化,并且对应队列的durable属性也设置为true,但这仍然无法避免缓存带来的问题:因为消息发送的时间和写入的时间之间存在差距到磁盘并执行刷新动作。一个短暂但有问题的时间窗口。
发布者确认机制确保客户端知道哪些消息已存储在磁盘上。但是,您通常不希望遇到由于单点故障而导致服务不可用的情况。
引入镜像队列机制,可以将队列镜像到集群中的其他Broker节点。如果集群中的一个节点出现故障,队列可以自动切换到镜像中的另一个节点,以保证服务的连续性。
可用性。 确保消息正确地从队列传递到消费者。
编辑器在消费消息时采用手动ack确认机制;如果消费者接收到消息之前发生异常,无法对其进行处理,或者在处理过程中发生异常,都会导致④失败。为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(message acknowledgment);代码语言: javascript copy #启用手动ack消息消费确认 spring.rabbitmq.listener.simple.acknowledge-mode=手动消费在订阅队列时,通过上述配置,不使用自动确认,使用手动确认,RabbitMQ会等待消费者显式回复确认信号,然后再从队列中删除消息;如果消息消费失败,还可以调用 basicReject() 或 basicNack() )来拒绝当前消息,而不是确认它。
如果requeue参数设置为true,则可以将消息重新存入队列中发送给下一个消费者(当然,当只有一个消费者时,这种方法可能会造成重复消费的死循环。可以投递到新队列,或者只打印异??常日志);??????我参加第二期腾讯科技创造特训营有奖征文分享万元奖池和键盘手表。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-17
06-06
06-06
06-18
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用