一、RabbitMQ简介
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。它是AMQP(高级消息队列协议)的标准实现。遵循Mozilla Public License开源协议。
MQ(message queuing)使用消息将应用程序连接起来。这些消息通过像RabbitMQ 这样的消息代理服务器在应用程序之间路由。这就像是在应用程序之间放置一个邮局。我们想要解决的这个问题是处理庞大的实时信息,并把它们快速路由到众多的消费者。我们要在不阻塞消息生产者的情况下做到这一点,同时也无须让生产者知道最终消费者是谁。RabbitMQ使用一种基于标准的方法来确保应用程序之间相互通信,而不管应用是用Python、PHP 还是Scala 编写的。
RabbitMQ结构图如下:

二、RabbitMQ安装
安装配置epel源
rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
安装erlang
yum -y install erlang
安装RabbitMQ
yum -y install rabbitmq-server
启动RabbitMQ /etc/init.d/rabbitmq-server start
安装API pip3 install pika
三、实现最简单的队列通信
send端
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘192.168.65.245‘)) # 连接消息队列服务器 8 channel = connection.channel() # 生成一个管道 9 10 # 声明queue 11 channel.queue_declare(queue=‘hello‘) # 在管道中生成一个队列,队列的名称叫hello 12 13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 14 channel.basic_publish(exchange=‘‘, 15 routing_key=‘hello‘, 16 body=‘Hello flash!‘) # RabbitMQ不能直接往queue里放消息,必须先通过exchange 17 print(" [x] Sent ‘Hello flash!‘") 18 connection.close()
receive端
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘192.168.65.245‘)) 8 channel = connection.channel() 9 10 11 # You may ask why we declare the queue again ? we have already declared it in our previous code. 12 # We could avoid that if we were sure that the queue already exists. For example if send.py program 13 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 14 # practice to repeat declaring the queue in both programs. 15 channel.queue_declare(queue=‘hello‘) 16 17 18 def callback(ch, method, properties, body): 19 print(" [x] Received %r" % body) 20 21 channel.basic_consume(callback, # callback, 回调函数 22 queue=‘hello‘, 23 no_ack=True) 24 25 channel.start_consuming()

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
生产者代码
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘192.168.65.245‘))
channel = connection.channel()
# 声明queue
channel.queue_declare(queue=‘task_queue‘)
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
routing_key=‘task_queue‘,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消费者代码
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘192.168.65.245‘))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b‘.‘))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue=‘task_queue‘,
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上。
消息持久化
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘192.168.65.245‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print ‘ok‘
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=False)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
2、durable 消息不丢失
生产者代码
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.65.245‘))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue=‘hello‘, durable=True)
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent ‘Hello World!‘")
connection.close()
消费者代码
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.65.245‘))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue=‘hello‘, durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print ‘ok‘
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=False)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
原文:http://www.cnblogs.com/Rambotien/p/5591077.html