RabbitMq & AMQP

rabbitmq是一个实现了AMQP(高级消息队列协议)的消息队列系统。消息队列使用消息将通信双方链接起来,使得消息通过像rabbitmq这种消息代理服务器在不同的程序之间进行路由。这就像是在通信双方之间放置了一座邮局一样。rabbitmq不是对AMQP协议的唯一实现,与其说我们在学习rabbitmq,倒不如说我们在学习AMQP,一种标准的消息通信协议。

RabbitMq中的路由模型

rabbitmq作为一个邮局的角色,将通信的双方链接了起来。通信双方生产者和消费者的身份可以随意的变换,对于生产者和消费者来说,他们都不知道彼此的存在。 生产者生产消息发送给rabbitmq的服务器,消费者接受rabbitmq服务器发送的消息进行消费。通过rabbitmq服务器路由的消息通常分为两部分:

  1. 消息主体内容(以二进制形式存储)
  2. 消息标签

主体内容不用解释,消息标签也很容易理解,它相当于有这条消息的一个描述,会决定这个消息最终的去向。

信道

想要发送和接受消息,通信双方首先都需要与rabbitmq服务器建立一条TCP连接。一旦TCP链接打开,我们就可以在其上建立一条AMQP信道。信道是建立在真实的TCP链接之上的,以后众多的AMQP命令都是通过信道来完成的。之所以不直接用TCP链接,是因为多线程频繁发送AMQP命令的时候 ,如接受消息,发送消息等。都会在短时间建立大量的TCP链接,首先这对系统的资源是一个极大的浪费,其次,线程的调度会使得TCP链接频繁的建立和关闭,这势必会影响到消息通信的性能。 我们的目标是在不影响性能和使用较少资源的前提下,满足多线程工作对于链接的需求。线程启动之后,在TCP连接上建立属于自己的AMQP信道,在不影响性能的前提下也保证的了通信的私密性。并且 ,在一条TCP链接上,一秒可以建立成千上万个AMQP信道,对于高并发系统的多线程调度是完全能够满足的。

AMQP的协议模型

AMQP协议栈中,有三个不可或缺的部分,以自上而下的顺序来展示,分别是:

  1. 交换器
  2. 绑定
  3. 队列

队列

队列是最接近消费者的部分,也是rabbitmq中消息传递的终点。队列核心的作用有两个:

  1. 存储未派发的消息
  2. 派发消息给相应的消费者

一条消息从生产者发出,首先要发送到交换器,交换器根据特定的规则将消息发送给相应的队列。此时消息存储在队列中。 如果当前有消费者订阅了该队列,那么就将消息派发给消费者,否则继续保存此消息。消费者订阅队列的方式也有以下两种:

  1. 单次订阅: 通过AMQP提供的basic.get方法,每当想要处理消息的时候,都订阅一个队列,接收到消息之后取消订阅。等到再次需要消息的时候,与队列重新建立订阅关系,获取消息。
  2. 持续订阅: 通过AMQP提供的basic.consume方法,订阅一个队列,自动不断的接收队列派发的消息。直到主动取消与队列的订阅关系为止。

通过上面的描述,很容易看出,单次订阅是有需要的时候再去消息,消费者是主动的。而持续订阅消费者是被动的,要一直等待接收队列中的消息。 但是切记不要把单次订阅放在死循环中来模拟持续订阅的效果,这将会对rabbitmq的性能造成极大的影响。

队列中的消息派发给消费者之后,并不会立刻将这个消息从队列中删除,而是会等消费者回复一个确认的消息来确定此消息已经被成功接收了。这个时候,rabbitmq的队列才会放心的将此消息移除。rabbitmq在等待某个消息的确认信息之前,会一直在队列保存着这个消息。这样一来即使消费者因进程崩溃等原因断开了与mq的链接, rabbitmq会认为该消息没有被成功派发,进而会派发给其他订阅这个队列的消费者。假设没有这种机制,队列将消息派发出去之后并不知道该消息是否派发成功就删除它,那么, 如果此时消息派发失败,这个消息就被彻底丢掉了。这种成功接收消息的确认机制能够保证,队列中任意一个消息都会被消费者成功处理。

上面说的情况,是建立在队列派发的消息正是消费者需要处理的消息的前提下。假设,消费者并不想处理队列派发过来的消息, 通常有两种方式可以选择。

  1. 在向rabbitmq发送确认信息前,主动断开链接。这个时候,rabbitmq会认为此消息没有被成功接受,会派发给其他订阅的消费者
  2. 调用AMQP提供的Reject命令,来拒绝接受此消息。

需要说明的是,在调用reject命令的时候,有一个叫做requeue的布尔型参数需要设置。若该值为true,那么被拒绝的消息会重新回到队列中等待被派发。 如果为false,那么该消息将会被发送至一个“死信”队列中。该队列中的消息都是一些被拒绝且不需要继续处理的。

要按照规则使用队列之前,首先需要创建队列。在AMQP中,消费者和生产者都可以创建队列,但是同一个队列只能被成功创建一次,另外一次的创建就是无效的,但是不会报错。 对消费者来说,如果已经在某个信道上订阅了某个队列,那么在对这个队列取消订阅之前,不得再创建新的队列。创建队列的时候会需要提供一个队列的名称,如不提供则rabbitmq会为你指定一个默认的名称。至于到底是由生产者来创建队列还是消费者来创建队列,最好的办法是消费者和生产者都创建。首先,创建两次并不会造成什么不利的影响,因此猜测创建队列的操作应该是幂等的。 其次,无论是让谁单独来创建,都有可能在消息从交换器发送过来的时候,队列还没有创建成功。如果出现这种情况,那么消息就会被丢弃。综合上面两个因素,生产者和消费者都做创建队列的工作比较稳妥。

交换器和绑定

有了交换器和绑定,他们和队列在实现了rabbitmq中的消息路由框架。一个完整的消息传递过程如下:生产者生产消息,消息由主体内容和标签构成。生产完毕后,将消息发送给交换器。交换器接收到消息后, 会根据消息的标签来决定将其投放给哪一个队列。这个标签也被称为路由键。队列需要携带特定的路由键绑定到交换器上。如果消息携带的路由键和队列绑定在交换器上的路由键相匹配,那么此时该消息就会被发送给这个队列。 否则,消息中的路由键没有和任何一个队列的路由键匹配,该消息将会被丢弃。投放到队列中的消息会被订阅该队列的消费者取走处理。

之所以使用这么复杂的方式,将生产者的消息送到消费者的身边。除了能够通过交换器和路由键实现多样化的通信方式之外,还有的一个好处就是,通信的双方都不需要在意对方的任何状态。他们交互的对象都是rabbitmq,这样一来 ,即使开发过程中出现消费者和生产者的更换,或者使用不同的语言来实现消费者和生产者,切换起来都是无障碍的,非常方便。在代码层面上,生产者和消费者也就彻底的解耦了。

交换器

交换器的类型决定了消息通信的使用场景,大致分为以下几类:

  1. direct
  2. fanout
  3. topic

direct

direct交换器的作用非常简单:只要消息的路由键和队列的路由键匹配,那么就将消息投放到相应的队列中。rabbitmq服务器会自动实现一个direct类型的交换器。

fanout

见名知意,fanout类型的交换器即为扇形交换器。他会将收到的消息投放给所有绑定到该交换器上面的队列。这种特性能够帮助我们对同一个消息采取多种的处理方式。

topic

topic交换器和其他几种类型最大的不同就是在路由键上可以使用通配符,以此来实现不同来源的消息可以被投放进一个相同的队列。像direct,一个消息再发给交换器的时候, 它自身携带的路由键就已经决定了它被投递的队列是那个,队列上的路由键也决定了它只能够接收带有同样路由键的消息。topic在队列的路由键上支持两种通配符

  1. * : *号代表可以匹配一个特定位置的0个和多个字符。有*的路由键里,.号被当做是分隔符。
  2. #: #号代表全匹配,此时.号被当做是普通字符。

举例来讲,如果队列绑定交换器时指定的路由键是*.abc.com那么,该队列将会接受一切路由键以.abc.com为后缀的消息。通过上面的阐述,我们能够了解到。 一个完成的消息路由过程,需要消息,交换器,队列互相合作。队列要携带特定的路由键绑定交换器才会收到消息。一个消息最后被投放到哪个,或者哪几个队列,是由交换器的类型和其路由键来决定的。

Vhost:多租户模式的虚拟主机与隔离

vhost是AMQP中的一个概念,vhost对于AMQP就相当于虚拟主机对于物理主机一样。都是在逻辑上实现多实例的分离,使得我们可以在同一台rabbitmq服务器上启动多个vhost,来为多个应用程序服务。 一方面,vhost严密的隔离性使得我们不需要担心多个应用程序之间消息数据的混乱以及交换器和队列的命名冲突问题,还有一个就是减少了我们管理多个rabbitmq的成本。一个vhost就相当于一个小的rabbitmq实例, 除了在链接rabbitmq的时候需要指定特定的vhost名称,其余没有特别的使用限制。而且,在没介绍vhost之前,我们就已经在使用了,只不过使用的是默认的vhost,它的名称是”/“。

vhost必须在连接的时候就指定。例如,当我们要使用amqp协议链接rabbitmq时,会建立一个这样的url,amqp://user:passwd@localhost:port/vhost。当我们要链接rabbitmq的时候,如果我们在url中指定了vhost,那么我们就只能访问这个vhost中的队列和交换器。