我们一般在异步系统的实现中,选择使用消息队列.其中一个比较重要的原因就是担心数据储存在内存中会因为物理上的影响而丢失。消息队列中都有使消息持久化的机制。本次我在项目中应用rabbitmq的时候,也有消息持久化的需求,结合实际经验,介绍一下rabbimq中消息持久化的原理。

为什么需要持久化

一般来说,异步的系统在处理某一个任务的时候,都不会一次性将任务当中所有的逻辑处理完。很多时候会因为服务重启,优先级高低,甚至是硬盘损坏等原因而影响了任务的正常运行。既然有中断,就证明处理中的任务会丢失。如果不对消息持久化的话,服务重启之后可能就忘记了之前处理过但是没处理完的任务,导致了系统中有很多的垃圾任务存在。一个基本的异步系统是需要在服务遇到问题重启之后,将之前未完成的任务重新加载继续执行的。

诚然,这种功能可以通过数据库来实现。但是,一方面利用数据库重新实现一个消息队列的时间较长,功能不完善,还有一个就是,异步系统中任务数量一旦增多,优先级和并发度也是需要我们考虑的。综合来讲,使用rabbitmq这种消息队列是非常适合的。

rabbitmq消息持久化三要素

持久化的队列和交换器

rabbitmq所在的服务器可能会因为各种情况而重启,如果不做设置的话,重启前建立的交换器和队列在重启后都会消失,不会重新建立。在创建队列和交换器的时候,可以设置durable的属性为true,即可实现队列和交换器的持久化。每次rabbitmq因故重启的时候,都会重新创建这些队列和交换器,而不需要我们再主动创建。

持久化的消息投递模式

现在我们有了持久化的队列和交换器,如果想实现消息持久化的话,需要将我们投递的消息也改成一种“持久化的模式”。在send消息的时候,将消息的投递模式设为常数2,此时该消息被标记为持久化的消息。

投递消息到持久化的队列和交换器中

有了持久化的消息,队列,交换器,现在要做的就是将持久化的消息投递到持久化的交换器,最终达到持久化的队列。如果持久化的消息投递到了非持久化的队列或者交换器,很有可能因为rabbitmq重启而导致丢失。

消息持久化的实现

持久化消息在被发送到持久化的交换器之后,消息的相关信息会被写入一个持久化的日志文件里,这个日志文件是存储在硬盘的。只有这个消息被写入了这个日志之后,你的发送消息的操作才会得到响应。消息到达队列之后,如果在未消费之前,rabbitmq重启,此时队列和交换器都会重新自动建立,然后读取持久化日志中的消息,重新插入到相应的队列或交换器中。正常被消费掉的消息在持久化日志里面的记录会被标记为待收集处理的状态,这里的正常消费是只消息被拿走处理并且被ACK掉的。

最后的手段

如果你觉得有了上面的手段就可以保证消息肯定不会丢失,那确实是为时过早了。根据上面提供的方法,你是否注意到,在消息写入到持久化日志之前如果出现异常,是不是也会出现消息丢失的情况呢。所以说,最靠谱的保持消息不丢失的办法就是,在消息没有被正常保存到持久化日志之前通知消息的生产者,让它重发一遍不就行了么?直到消息真正的被持久化起来。

rabbitmq在信道这一层级为我们提供了一个发送确认的模式,当我们使用的信道开启了这种模式的时候,消息在到达了持久化队列并且成功写入了持久化日志之后会给消息的生产者发送一个“确认接受”的消息,生产者接受这个消息之后可以根据自定义的回调函数才执行一些逻辑。同样,如果消息没有被正确的投递或者持久化,rabbitmq也会发送一个nack的消息给生产者,代表这个消息已经丢失了,可以在相应的回调函数中来决定是否要重发这条消息。