RabbitMQ快速入门(三)Work Queues

前面介绍了简单的RabbitMQ示例:一个生产者、一个消费者。然而,面对复杂耗时的任务时,为了提高工作效率,往往会使用多个消费者进行消费。此时的工作队列(Work Queue)需要将消息分发给多个消费者。如图1-1所示:

work queue1-1

图1-1

         为了模拟出复杂、耗时的任务,我们假设每条字符串消息代表一项任务,并且字符串中的点(.)代表该项任务耗时的多少(每个点代表1秒)。比如,”hello world…”表示这项任务将耗时3秒钟完成。

1.生产者应用程序

(1)建立连接,跟前面的hello world示例如出一辙:

(2)声明队列:

其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。

(3)生产消息:

其中,

1)指定默认交换器为’’,指定路由关键字为’task_queue’;

2)指定消息属性为2,即持久化消息。

(4)关闭连接:

2.消费者应用程序

(1)建立连接,同上。

(2)声明队列,同上。

(3)定义消费信息时的回调函数:

其中,

1)根据消息字符串中点的个数,休眠相应的时间;

2)消费完消息之后,返回确认状态。

(4)预取消息:

预取消息的条数为1。

(5)开始消费

3.消息确认

对比前文中的示例1和此文中的示例2中的回调函数callback()会发现稍有不同。

示例1中显示的关闭的消息确认机制,这是因为,默认情况下消息确认机制是开启的:

示例2未显示关闭,则表示启用了消息确认机制,这保证了消费者应用程序在处理消息的过程中若异常崩溃,不会丢失当前处理的消息。因为,消费者应用程序崩溃之后,消息代理(message broker)无法收到该消息返回的ack,那么它会重新将这条消息发送至队列中去(re-queue),这样的话,其他的消费者就可以接着处理这条消息了。

简单地说,启动了消息确认机制之后,消息代理在没有收到ack之前,该条消息不会从消息代理中移除。

4.消息持久性

消息确认机制使得在某个消费者挂掉之后,该消息仍然能够得到其他的消费者进行正常处理。然而,若RabbitMQ服务异常关闭,就没有这么幸运了。

所以,若想在RabbitMQ服务重启之后,队列和消息不会丢失,则需要将它们的属性设置成持久化。

队列持久化:

消息持久化

5.预取消息

假设有编号为1-10的十条消息(任务),其中奇数编号的任务耗时很长,偶数编号的任务耗时很短。消费者C1会被分配奇数编号的任务,消费者C2会被分配执行偶数编号的任务,这会导致一种情况:C2会很快的执行完所有的偶数编号任务,C1仍然在执行第1条任务,而第3、5、7、9虽然已经分配给了C1却在等待着被执行。

此时,即使空闲的C2却没法帮C1分担任务。

使用预取消息功能,可以解决类似问题,如图5-1所示:work queue5-1

图5-1

         设置消费者每次从队列里预取一条消息。这样一来,RabbitMQ每次只会发送一条消息给消费者,并且在消费者处理完该条消息之前(或者说在返回ACK之前),消费者不会再接收到其他消息:

6.示例

(1)生产者应用程序new_task.py

(2)消费者应用程序work.py

(3)运行

先运行两个消费者,再生产4条消息:

#消费者C1

#消费者C2

#生产者

7.其他

本文主要学习了RabbitMQ的任务分发机制,介绍了队列持久性、消息持久性、消息确认以及预取消息等特性,并给出了相应的编码示例。

参考:
http://www.rabbitmq.com/tutorials/tutorial-two-python.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注