初识RabbitMQ消息队列

RabbitMQ是对AMQP的实现,什么是AMQP?AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

1.RabbitMQ定义

简言之,RabbitMQ是一个可以用来接收和转发消息的消息代理中间件。举个栗子,你可以把RabbitMQ的机制想象成一个邮局:当你把一封邮件放进邮箱之后,邮递员就会将你的邮件转发给邮件的接收者。这里,RabbitMQ就是承担着邮箱、邮局和邮递员的功能。

2.RabbitMQ相关术语

(1)producer:消息的生产者,用来发送消息的一方。

(2)consumer:消息的消费者,用来接收消息的一方。

(3)queue:队列。它好比我们前面举得例子中的邮箱,是用来存放producer生产的消息的。队列只受主机的内存和硬盘的影响,它本质上是一个很大的消息缓冲区。Producers向队列中生产消息,consumers从队列中消费消息。

(4) channel:通道,建立连接之后必须建立通道才能生产和消费消息,一个连接内可以建立多个通道。

(5)exchange:消息交换机,他指定消息按什么规则发送到指定的队列queue中。有四种类型:direct, fanout, topic, headers。

(6) routing_key:路由关键字,exchange根据这个关键字投递,消费者也可以根据这个关键字消费。

(7) 其他,可以查看官方文档。

 

3.RabbitMQ的实现机制

RabbitMQ的实现机制就好比生产者-消费者模型。如下示例可以直观的理解:生产者发送消息到名为hello的queue中,然后消费者从这个queue中接收消息。11

4.RabbitMQ C接口

RabbitMQ提供了很多编程语言的api,这里主要例举了C语言的一些常用的api,其他未例举的api,读者可自行查阅官方文档。

(1)amqp_new_connection

  • 函数原型:amqp_connection_state_t amqp_new_connection(void);
  • 函数功能:声明一个新的连接
  • 函数参数:void
  • 返回值:成功返回一个新的连接;失败返回NULL

(2)amqp_open_socket

  • 函数原型:int amqp_open_socket(char const *hostname, int port);
  • 函数功能:打开一个新socket
  • 函数参数:hostname——主机名或IP地址;port——连接的端口号,一般是5672。
  • 返回值:成功返回一个正整数,即套接字的文件描述符;失败返回负数。
  • 说明:该函数被amqp_socket_open()取代了

(3)amqp_set_socket

  • 函数原型:void amqp_set_socket(amqp_connection_state_t state, int sockfd);
  • 函数功能:将连接(state)与socket文件描述符(sockfd)关联起来。
  • 函数参数:state——打开的连接;sockfd——打开的套接字
  • 返回值:void

(4)amqp_tcp_socket_new

  • 函数原型:amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state);
  • 函数功能:创建一个新的TCP套接字socket
  • 函数参数:state——建立的连接
  • 返回值:成功返回一个指向新套接字的指针;失败返回NULL

(5)amqp_socket_open

  • 函数原型:int amqp_socket_open(amqp_socket_t *self, const char *host, int port)
  • 函数功能:打开一个socket连接
  • 函数参数:self——amqp_tcp_socket_new函数返回的socket对象;host——主机名;port——端口号
  • 返回值:成功返回AMQP_STATUS_OK;失败返回amqp_status_enum枚举型。

(6)amqp_login

  • 函数原型:amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, …);
  • 函数功能:登陆RabbitMQ server
  • 函数参数:
    • state——打开的连接;
    • vhost——rabbitmq的虚机主机,是rabbitmq进行权限管理的最小单位,默认是”/”;
    • channel_max——一个连接的最大channel数,0表示无限制;
    • frame_max——一个连接的最大frame,最小为4096,最大是2^32-1,建议默认设置为131072较好;
    • heartbeat——默认填0即可;
    • sasl_method——默认填AMQP_SASL_METHOD_PLAIN;
    • …——上一个参数填写为默认值AMQP_SASL_METHOD_PLAIN后,省略号处填写两个参数,分别为登陆的用户名和密码。
  • 返回值:成功或失败都返回一个amqp_rpc_reply_t结构体。若reply_type == AMQP_RESPONSE_NORMAL,则登陆成功;若r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION表示登陆失败…

(7)amqp_channel_open

  • 函数原型:amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
  • 函数功能:打开一个通道,关联连接state和通道channel
  • 函数参数:state——打开的连接;channel——通道
  • 返回值:返回amqp_channel_open_ok_t结构体

(8)amqp_exchange_declare

  • 函数原型:amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t auto_delete, amqp_boolean_t internal, amqp_table_t arguments);
  • 函数功能:声明一个exchange
  • 函数参数:
    • state——如前所述;
    • channel——如前所述;
    • exchange——声明的exchange名;
    • type——exchange的类型(direct、fanout、topic),不同类型对应不同的投递算法;
    • passive——布尔类型,0表示false,其他表示true;
    • durable——同上,是否持久化,若是,exchange将在server重启前一直有效;
    • auto_delete——同上,是否自动删除,若是,exchange将在于其绑定的队列都删除之后删除它自己;
    • internal——同上,布尔值
    • argument——一般写为amqp_empty_table。
  • 返回值:返回一个amqp_exchange_declare_ok_t结构体

(9)amqp_queue_declare

  • 函数原型:amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t argument);
  • 函数功能:声明一个queue
  • 函数参数:
    • state——打开的连接;
    • channel——管道;
    • queue——队列;
    • passive——布尔类型,0表示false,其它表示true;
    • durable——同上,是否持久,若是,queue将在server重启前一直有效;
    • exclusive——同上,若是,queue将只能被声明它的消费者使用;
    • auto_delete——同上,若是,queue将在其消费者停止使用之后自动删除自己;
    • argument——一般写为amqp_empty_table。
  • 返回值:返回一个amqp_queue_delcare_ok_t结构体

(10)amqp_queue_bind

  • 函数原型:amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments);
  • 函数功能:声明一个queue
  • 函数参数:
    • state——打开的连接;
    • channel——管道;
    • queue——队列;
    • exchange——exchange;
    • routing_key——路由规则;
    • argument——一般写为amqp_empty_table。
  • 返回值:返回一个amqp_queue_delcare_ok_t结构体

(10)amqp_basic_publish

  • 函数原型:int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body);
  • 函数功能:生产一个消息
  • 函数参数:
    • state——建立的连接;
    • channel——通道;
    • exchange——交换机exchange;
    • routing_key——路由规则;
    • mandatory——indicate to the broker that the message must be delivered to a queue;
    • immediate——indicate to the broker that the message must be delivered to a consumer immediately;
    • properties——属性结构体,可以用来设置消息的属性;
      body——消息体。
  • 返回值:成功返回AMQP_STATUS_OK;失败返回amqp_status_enum枚举型。

(11)amqp_basic_qos

  • 函数原型:amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
  • 函数功能:qos(quanity of service),用于控制预取消息数,避免消息按条数均匀分配,需要和no_ack配合使用。
  • 函数参数:
    • state——建立的连接;
    • channel——通道;
    • prefetch_size——预取消息的大小;
    • prefecth_count——预取消息的条数;
    • global——布尔值。
  • 返回值:返回amqp_basic_qos_ok_t结构体。

(12)amqp_basic_consume

  • 函数原型:amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusice, amqp_table_t argument);
  • 函数功能:开始消费
  • 函数参数:
    • state——建立的连接;
    • channel——打开的通道;
    • queue——队列;
    • consumer_tag——一般设置为amqp_empty_bytes;
    • no_local——布尔值,0表示false,其它表示true;
    • no_ack——布尔值,是否需要确认消息后再从队列中删除消息;
    • exclusive——布尔值;
    • arguments——一般置为amqp_empty_table。
  • 返回值:返回一个amqp_basic_consume_ok结构体。

(13)amqp_consume_message

  • 函数原型:amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, struct timeval *timeout, int flags);
  • 函数功能:读取、消费消息;
  • 函数参数:
    • state——建立的连接;
    • envelope——存储消息的结构体;
    • timeout——超时时间;
    • flags——填0,未使用。
  • 返回值:返回amqp_rpc_reply_t结构体。

(14)amqp_destroy_envelope

  • 函数原型:void amqp_destroy_envelope(amqp_envelope_t *envelope);
  • 函数功能:销毁envelope结构体。
  • 函数参数:envelope——存储接收消息的结构体
  • 返回值:void

(15)amqp_channel_close

  • 函数原型:amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, int code);
  • 函数功能:关闭通道channel。
  • 函数参数:state——建立的连接;channel——通道;code——一般置为AMQP_REPLY_SUCCESS
  • 返回值:返回amqp_rpc_reply_t结构体。

(16)amqp_connectionc_close

  • 函数原型:amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code);
  • 函数功能:关闭连接。
  • 函数参数:state——建立的连接;code——一般置为AMQP_REPLY_SUCCESS。
  • 返回值:返回amqp_rpc_reply_t结构体。

(17)amqp_destroy_connection

  • 函数原型:int amqp_destroy_connection(amqp_connection_state_t state);
  • 函数功能:销毁连接
  • 函数参数:state——建立的连接
  • 返回值:成功返回AMQP_STATUS_OK;失败返回amqp_status_enum枚举型。

(18)其他

此处未例举的接口,可以自行查看文件<Amqp.h>, <amqp_framing.h>, <Amqp_tcp_socket.h>

5.RabbitMQ一般使用步骤

(1)首先,创建一个新的连接、创建一个tcp套接字、打开套接字;

(2)然后,登陆RabbitMQ服务;

(3)打开通道channel、声明一个交换机exchange、声明队列queue;

(4)发布消息到exchange上/从queue中消费消息。

6.示例

为了直观的感受RabbitMQ的使用,下面示例在调用接口时并没有做任何错误判断。

(1)rabbitmq_sendstring.c

生产者将消息生产到exchange中,并且在生产的时候,指定了exchange的路由规则route_key(消费者绑定queue时指定route_key,消费者根据该rute_key才能消费)。

(2)rabbitmq_recvstring.c

消费者声明queue,并根据route_key将该queue与exchange绑定,进行消费。

(3)输出

发送端:

接收端:

6. 后续

这篇文章简单介绍了RabbitMQ的相关知识,通过阅读本文,您可以实现一个简单的消息队列实例。

然后,对于exchange的类型、属性以及队列的属性和消息的属性等重要内容却没有做深入介绍,后续有机会深入研究再记录下来。

符一些其他资料:

http://blog.csdn.net/samxx8/article/details/47417133
http://www.infoq.com/cn/articles/AMQP-RabbitMQ
http://www.diggerplus.org/archives/3110#comments

发表评论

电子邮件地址不会被公开。 必填项已用*标注