在熟悉了AMQP的相关概念之后,再学习RabbitMQ就比较容易了。毕竟RabbitMQ是对AMQP的实现,并且RabbitMQ提供了包括c/c++, python, java等众多语言可供开发者选择使用。
1.RabbitMQ
RabbitMQ是一个消息中间件,用来接收和转发消息。相关术语我们在前面的章节中已经介绍过了,比如:生产者、消费者、交换器、队列等等。
生产者用来生产消息,用图1.1表示:
图1.1
生产者生产的消息经过交换器路由之后,会进入相应的队列中去,用图1.2表示如下:
图1.2
队列说白了就是一个很大的消息缓冲区,它只受限于主机的内存和磁盘的约束。消费者从这些队列中去接收消息进行消费,消费者用图1.3表示:
图1.3
基于上述三个实体,一个简单的生产者-消费者模型表示如下,这也正是RabbitMQ总体设计的理念:
图1.4
因为AMQP是一种网络协议,所以生产者、消息代理(message broker, 主要负责消息的接收、转发)以及消费者可以分布在不同的主机上。
2.Hello World!
使用python来实现图1.4模型,P生产消息至队列hello中,再由C消费。需要说明的是,使用python语言进行RabbitMq客户端程序的编写需要导入pika模块。
(1)生产消息
1)首先,建立AMQP连接。因为AMQP是一种网络协议,所以使用RabbitMQ的第一步就是建立连接:
1 2 3 4 5 |
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel(); |
其中,
a)连接参数只指定了host,在前面介绍AMQP的章节中,我们知道建立一个AMQP连接还需要一些其他参数。如vhost参数这里使用了默认值’/’,port使用默认值5672端口,用户名和密码使用默认值’guest’等等。
b)同样地,在介绍AMQP时,我们知道建立连接之后,进行消息的生产之前,我们还必须要打开一个通道。
2)接着,声明一个队列:
1 |
channel.queue_declare(queue='hello') |
其中,指定了队列的名字为hello。当然,你也可以不去声明这个队列,让消费者去声明。但是这样你将面临消息被丢弃的风险(如果消费者应用程序比生产者应用程序先执行,可以规避这个风险)。
3)然后,生产一个消息:
1 2 3 4 |
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print("Sent 'Hello World!'") |
其中,
a)指定默然交换器exchange=””。在对AMQP进行学习时,我们知道:生产者生产的消息并不是直接生产至队列中去的,而是经过交换器,再由交换器路由至相应的队列中。默认交换器的特性可参考前一篇文章。
b)指定消息的路由关键字为”hello”。介于默认交换器的特性,此处指定的路由关键字”hello”其实就是待路由队列的名字。
c)生产的消息内容为”Hello World!”。
4)最后,关闭连接:
1 |
connection.close() |
(2)消费消息
1)首先,建立连接。跟消费者一样,生产者也需要向消息代理发起AMQP连接。
2)接着,声明队列:
1 |
channel.queue_declare(queue='hello') |
3)然后,进行消费:
1 2 3 4 5 6 7 8 |
def callback(ch, method, properties, body): print("Recvived %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming() |
其中,
a)定义回调函数callback(),指定消费消息时的消费行为。
b)进行消费。指定回调函数、队列名等。
c)此处未关闭连接,消费者将一直等待消息。
(3)运行
生产者源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel(); channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print("Sent 'Hello World!'") connection.close() |
消费者源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("Recvived %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming() |
先执行消费者应用程序,再执行生产者应用程序:
1 2 3 4 5 6 |
leo@ubuntu 01$ python receive.py Waiting for messages. To exit press CTRL+C Recvived 'Hello World!' leo@ubuntu 01$ python send.py Sent 'Hello World!' |
3.其他
本文主要介绍了RabbitMQ的基本使用方法:建立连接、声明队列、生产/消费消息以及关闭连接内容。
参考:
http://www.rabbitmq.com/tutorials/tutorial-one-python.html