前一篇文章学习了工作队列,细心的你或许会发现:生产者生产的每一条特定的消息要么被消费者C1消费,要么被消费者C2消费。也就是说,一条消息只能被一个消费者消费。那么,多个消费者如何消费同一条消息呢?
1.fanout类型交换器
我们将“向多个消费者发送同一条消息”的模式称为“发布/订阅”模式。在RabbitMQ中实现这种模式,依赖于特定类型的交换器。我们知道,生产者生产的消息需要经过交换器,再由交换器路由到指定的队列中。交换器可以将某条消息发送给一个队列或多个队列(如图1-1所示),同样也可以将它丢弃,这些都依赖于交换器的类型。
图1-1
AMQP交换器分为四种类型:direct, fanout, topic, header。
对于fanout类型的交换器,它的实现机制类似于广播机制,会将每一条消息广播给与之绑定的每一个队列。
(1)声明fanout类型交换器
1 |
channel.exchange_declare(exchange='logs', exchange_type='fanout') |
在前面的两个示例中,并没有显示的声明一个交换器,而是使用默认的交换器。此处,我们显示地声明一个fanout类型的交换器logs。该类型的交换器将会把接收到的所有消息广播给所有与之绑定的队列。
(2)向交换器中发送消息
1 2 3 |
channel.basic_publish(exchange='logs', routing_key='', body=message) |
介于fanout类型交换器的特性,此处发布消息并没有指定路由关键字routing_key。
至此,我们已经学会使用两种类型的交换器了——direct、fanout,前面两个示例使用的是默认交换器,默认交换器的类型是direct。
2.临时队列
前面两个示例中,分别声明了“hello”和“task_queue”队列。此处我们声明一个临时队列:不再显示指定队列的名字(而是随机生成以amq.为前缀的队列名),并且将队列属性设置成exclusive——该属性的队列会随着声明该队列的消费者应用程序关闭而关闭:
1 |
channel.queue_declare(exclusive=True) |
临时队列有两点好处:
(1)保证每次连接建立之后的队列都是一个全新的队列(队列中没有残留的未处理消息);
(2)一旦消费者断开连接,临时队列则会被自动删除。避免了消费者离线之后,消息仍然在队列中爆炸式的增长浪费磁盘空间。
3.绑定
将队列与fanout类型交换器绑定,实现了“发布/订阅”模式。这样一来,每当交换器接收到消息之后,就会将消息“发布”给所有“订阅”它的队列中去。
临时队列与fanout类型交换器绑定:
1 2 3 4 |
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) |
绑定操作需要交换器和队列的名字,交换器名字属性声明为了logs,而队列的名字是由消息代理随机生成,通过result.method.queue可以获得。
4.示例
介于临时队列的两个优点,实现一个简单的日志记录系统。该日志系统包括一个生产者用来产生日志;两个消费者,分别用来记录日志到本地和输出日志到屏幕。
(1)生产者emit_log.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message=' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print("Sent %r" % message) connection.close() |
(2)消费者receive_logs.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print('Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print("%r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() |
(3)依次启动消费者1,2及消费者应用程序:
#消费者1,记录日志到本地文件
1 2 3 4 |
[leo@ubuntu 03]$ python receive_logs.py > logs_from_rabbit.log [leo@ubuntu 03]$ cat logs_from_rabbit.log Waiting for logs. To exit press CTRL+C 'info: Hello World!' |
#消费者2,记录日志到标准输出
1 2 3 |
[leo@ubuntu 03]$ python receive_logs.py Waiting for logs. To exit press CTRL+C 'info: Hello World!' |
#生产者
1 2 |
[leo@ubuntu 03]$ python emit_log.py Sent 'info: Hello World!' |
由图4-1所示,消费者生产的日志消息“info: Hello World!”,经过fanout类型的交换器之后,分别转发给了临时队列amq.gen-NNozX8et4BA6CkRlhoD4LA和amq.gen-fb7swQqvCVYBnCgyLAz2dA。
图4-1
参考:
http://www.rabbitmq.com/tutorials/tutorial-three-python.html