博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【转载】rabbitmq的发布确认和事务
阅读量:5964 次
发布时间:2019-06-19

本文共 5792 字,大约阅读时间需要 19 分钟。

地址:https://my.oschina.net/lzhaoqiang/blog/670749

摘要: 介绍confirm的工作机制。使用spring-amqp介绍事务以及发布确认的使用方式。因为事务以及发布确认是针对channel来讲,所以在一个连接中两个channel,一个channel可以使用事务,另一个channel可以使用发布确认,并介绍了什么时候该使用事务,什么时候该使用发布确认

confirm的工作机制

       ‍ Confirms是增加的一个确认机制的类,继承自标准的AMQP。这个类只包含了两个方法:confirm.select和confirm.select-ok。另外,basic.ack方法被发送到客户端。

       ‍ confirm.select是在一个channel中启动发布确认。注意:一个具有事务的channel不能放入到确认模式,同样确认模式下的channel不能用事务。

        当confirm.select被发送/接收。发布者/broker开始计数(首先是发布然后confirm.select被记为1)。一旦channel为确认模式,发布者应该期望接收到basic.ack方法,delivery-tag属性显示确认消息的数量。

        当broker确认了一个消息,会通知发布者消息被成功处理;‍

       ‍ basic的规则是这样的:‍

        一个未被路由的具有manadatory或者immediate的消息被正确确认后触发basic.return;

        另外,一个瞬时态的消息被确认目前已经入队;

        持久化的消息在持久化到磁盘或者每个队列的消息被消费之后被确认。

        

        关于confirm会有一些问题:

        首先,broker不能保证消息会被confirm,只知道将会进行confirm。

        第二,当未被确认的消息堆积时消息处理缓慢,对于确认模式下的发布,broker会做几个操作,日志记录未被确认的消息

        第三,如果发布者与broker之间的连接删除了未能得到确认,它不一定知道消息丢失,所以可能会发布重复的消息。

        最后,如果在broker中发生坏事会导致消息丢失,将会basic.nack那些消息

        总之,Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,哪些可能因为broker宕掉或者网络失败的情况而重新发布。

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

事务

Spring AMQP做的不仅仅是回滚事务,而且可以手动拒绝消息,如当监听容器发生异常时是否重新入队。

持久化的消息是应该在broker重启前都有效。如果在消息有机会写入到磁盘之前broker宕掉,消息仍然会丢失。在某些情况下,这是不够的,发布者需要知道消息是否处理正确。简单的解决方案是使用事务,即提交每条消息。

案例:

  RabbitTemplate的使用案例(同步),由调用者提供外部事务,在模板中配置了channe-transacted=true。通常是首选,因为它是非侵入性的(低耦合)

 
@Transactionalpublic void doSomething() {     ApplicationContext context =             new GenericXmlApplicationContext("spring-amqp-test.xml");     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);     String incoming = (String) rabbitTemplate.receiveAndConvert(); // do some more database processing... String outgoing = processInDatabaseAndExtractReply(incoming); //数据库操作中如果失败了,outgoing这条消息不会被发送,incoming消息也会返回到broker服务器中,因为这是一条事务链。 //可做XA事务,在消息传送与数据库访问中共享事务。 rabbitTemplate.convertAndSend(outgoing); } private String processInDatabaseAndExtractReply(String incoming){ return incoming; }

 

 异步使用案例(外部事务)

 

在容器中配置事务时,如果提供了transactionManager,channelTransaction必须为true;如果为false,外部的事务仍然可以提供给监听容器,造成的影响是在回滚的业务操作中也会提交消息传输的操作。

使用事务有两个问题:

Ø  一是会阻塞,发布者必须等待broker处理每个消息。如果发布者知道在broker死掉之前哪些消息没有被处理就足够了。

Ø  第二个问题是事务是重量级的,每次提交都需要fsync(),需要耗费大量的时间。

confirm模式下,broker将会确认消息并处理。这种模式下是异步的,生产者可以流水式的发布而不用等待broker,broker可以批量的往磁盘写入。

发布确认

发布确认必须配置在CachingConnectionFactory上

 

若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true

每个rabbitTemplate只能有一个confirm-callback和return-callback

 //确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){     @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         if (ack) { System.out.println("消息确认成功"); } else { //处理丢失的消息(nack) System.out.println("消息确认失败"); } } });

 

使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,可针对每次请求的消息去确定’mandatory’的boolean值,只能在提供’return -callback’时使用,与mandatory互斥。

 rabbitTemplate.setMandatory(true);//确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //重新发布 RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey"); Throwable cause = new Exception(new Exception("route_fail_and_republish")); recoverer.recover(message,cause); System.out.println("Returned Message:"+replyText); } }); errorTemplate配置: 

 同一个连接不同channel使用事务和发布确认

private RabbitTemplate rabbitTemplate;private TransactionTemplate transactionTemplate; @Before public void init() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("192.168.111.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); template = new RabbitTemplate(connectionFactory); template.setChannelTransacted(true); RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory); transactionTemplate = new TransactionTemplate(transactionManager); connectionFactory.setPublisherConfirms(true); rabbitTemplate = new RabbitTemplate(connectionFactory); }
//发布确认测试@Testpublic void testPublishConfirm(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息确认成功"); }else{ System.out.println("消息确认失败"); } } }); //发送到一个不存在的exchange,则会触发发布确认 rabbitTemplate.convertAndSend("asd","aaa","message"); String message = (String) rabbitTemplate.receiveAndConvert(ROUTE); assertEquals("message",message); }
//事务测试@Testpublic void testSendAndReceiveInTransaction() throws Exception { //由于有spring的事务参与,而发送操作在提交事务时,是不允许除template的事务有其他事务的参与,所以这里不会提交 //队列中就没有消息,所以在channel.basicGet时命令返回的是basic.get-empty(队列中没有消息时),而有消息时,返回basic.get-ok String result = transactionTemplate.execute(new TransactionCallback
() { @Override public String doInTransaction(TransactionStatus status) { template.convertAndSend(ROUTE, "message"); return (String) template.receiveAndConvert(ROUTE); } }); //spring事务完成,对其中的操作需要提交,发送与接收操作被认为是一个事务链而提交 assertEquals(null, result); //这里的执行不受spring事务的影响 result = (String) template.receiveAndConvert(ROUTE); assertEquals("message", result); }

 

转载于:https://www.cnblogs.com/xujishou/p/6296092.html

你可能感兴趣的文章
Linux Namespace系列(09):利用Namespace创建一个简单可用的容器
查看>>
博客搬家了
查看>>
Python中使用ElementTree解析xml
查看>>
jquery 操作iframe、frameset
查看>>
解决vim中不能使用小键盘
查看>>
jenkins权限管理,实现不同用户组显示对应视图views中不同的jobs
查看>>
我的友情链接
查看>>
批量删除用户--Shell脚本
查看>>
Eclipse Java @Override 报错
查看>>
知道双字节码, 如何获取汉字 - 回复 "pinezhou" 的问题
查看>>
Python高效编程技巧
查看>>
js中var self=this的解释
查看>>
linux的日志服务器关于屏蔽一些关键字的方法
查看>>
事情的两面性
查看>>
只要会营销,shi都能卖出去?
查看>>
sed单行处理命令奇偶行输出
查看>>
VC++深入详解学习笔记1
查看>>
安装配置discuz
查看>>
线程互互斥锁
查看>>
KVM虚拟机&openVSwitch杂记(1)
查看>>