前面介绍了简单的RabbitMQ示例:一个生产者、一个消费者。然而,面对复杂耗时的任务时,为了提高工作效率,往往会使用多个消费者进行消费。此时的工作队列(Work Queue)需要将消息分发给多个消费者。如图1-1所示:
图1-1
为了模拟出复杂、耗时的任务,我们假设每条字符串消息代表一项任务,并且字符串中的点(.)代表该项任务耗时的多少(每个点代表1秒)。比如,”hello world…”表示这项任务将耗时3秒钟完成。
1.生产者应用程序
(1)建立连接,跟前面的hello world示例如出一辙:
(2)声明队列:
1 |
channel.queue_declare(queue='task_queue', durable=True) |
其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。
(3)生产消息:
1 2 3 4 5 6 |
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, #make message persisent )) |
其中,
1)指定默认交换器为’’,指定路由关键字为’task_queue’;
2)指定消息属性为2,即持久化消息。
(4)关闭连接:
1 |
connection.close() |
2.消费者应用程序
(1)建立连接,同上。
(2)声明队列,同上。
(3)定义消费信息时的回调函数:
1 2 3 4 5 |
def callback(ch, method, properities, body): print("Received %r" % body) time.sleep(body.count(b'.')) print("Done") ch.basic_ack(delivery_tag = method.delivery_tag) |
其中,
1)根据消息字符串中点的个数,休眠相应的时间;
2)消费完消息之后,返回确认状态。
(4)预取消息:
1 |
channel.basic_qos(prefetch_count=1) |
预取消息的条数为1。
(5)开始消费
1 2 3 |
channel.basic_consume(callback, queue='task_queue') channel.start_consuming() |
3.消息确认
对比前文中的示例1和此文中的示例2中的回调函数callback()会发现稍有不同。
示例1中显示的关闭的消息确认机制,这是因为,默认情况下消息确认机制是开启的:
1 2 3 |
channel.basic_consume(callback, queue='hello', no_ack=True) |
示例2未显示关闭,则表示启用了消息确认机制,这保证了消费者应用程序在处理消息的过程中若异常崩溃,不会丢失当前处理的消息。因为,消费者应用程序崩溃之后,消息代理(message broker)无法收到该消息返回的ack,那么它会重新将这条消息发送至队列中去(re-queue),这样的话,其他的消费者就可以接着处理这条消息了。
简单地说,启动了消息确认机制之后,消息代理在没有收到ack之前,该条消息不会从消息代理中移除。
4.消息持久性
消息确认机制使得在某个消费者挂掉之后,该消息仍然能够得到其他的消费者进行正常处理。然而,若RabbitMQ服务异常关闭,就没有这么幸运了。
所以,若想在RabbitMQ服务重启之后,队列和消息不会丢失,则需要将它们的属性设置成持久化。
队列持久化:
1 |
channel.queue_declare(queue='task_queue', durable=True) |
消息持久化
1 2 3 4 5 6 |
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, #make message persisent )) |
5.预取消息
假设有编号为1-10的十条消息(任务),其中奇数编号的任务耗时很长,偶数编号的任务耗时很短。消费者C1会被分配奇数编号的任务,消费者C2会被分配执行偶数编号的任务,这会导致一种情况:C2会很快的执行完所有的偶数编号任务,C1仍然在执行第1条任务,而第3、5、7、9虽然已经分配给了C1却在等待着被执行。
此时,即使空闲的C2却没法帮C1分担任务。
使用预取消息功能,可以解决类似问题,如图5-1所示:
图5-1
设置消费者每次从队列里预取一条消息。这样一来,RabbitMQ每次只会发送一条消息给消费者,并且在消费者处理完该条消息之前(或者说在返回ACK之前),消费者不会再接收到其他消息:
1 |
channel.basic_qos(prefetch_count=1) |
6.示例
(1)生产者应用程序new_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, #make message persisent )) #print("Sent %r" % message) connection.close() |
(2)消费者应用程序work.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print('Waiting for message. To exit press CTRL+C') def callback(ch, method, properities, body): print("Received %r" % body) time.sleep(body.count(b'.')) print("Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming() |
(3)运行
先运行两个消费者,再生产4条消息:
#消费者C1
1 2 3 4 |
[leo@ubuntu 02]$ python work.py Waiting for message. To exit press CTRL+C Received '1.................' Done |
#消费者C2
1 2 3 4 5 6 7 8 |
[leo@ubuntu 02]$ python work.py Waiting for message. To exit press CTRL+C Received '2.' Done Received '3.....' Done Received '4.' Done |
#生产者
1 2 3 4 |
[leo@ubuntu 02]$ python new_task.py 1................. [leo@ubuntu 02]$ python new_task.py 2. [leo@ubuntu 02]$ python new_task.py 3..... [leo@ubuntu 02]$ python new_task.py 4. |
7.其他
本文主要学习了RabbitMQ的任务分发机制,介绍了队列持久性、消息持久性、消息确认以及预取消息等特性,并给出了相应的编码示例。
参考:
http://www.rabbitmq.com/tutorials/tutorial-two-python.html