应用场景:
1. 通知,针对发送事件的描述,内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者管理员.
说明: 首先选择交换机,如果选择fanout交换机,则需要为每种告警传输类型(邮件/微信/手机/短信)创建队列,但同时也带来坏处就是每个消息都会发送到所有队列,导致告警消息发生时,被报警消息淹没,如果选择topic交换机,则可为其创建四种严重级别告警info/warning/problem/citical,但如果使用fanout类型交换机消息会发送到所有这四个级别队列,如果使用direct交换机,则四个严重等级会被定死,无法扩展,而topic交换机则允许我们在如上四个严重等级上加上类型,如当我们触发报警API时候路由键设置为critical.rate_limit,则消息不经会发送到cirtica.*l队列而且同时会被发送到*.rate_limite,至于针对每种类型怎么处理这就是消费者该干的事情了~
> 消费者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
# 创建凭证对象
credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
# 创建参数对象
conn_params = pika.ConnectionParameters(
# RabbitMQ服务地址
host=‘127.0.0.1‘,
# RabbitMQ服务端口
port=5672,
# RabbitMQ登录凭证
credentials=credentials,
# RabbitMQ虚拟主机
virtual_host=‘/‘
)
# 创建连接对象
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道对象
channel = conn_broker.channel()
# 创建日志交换机
channel.exchange_declare(
# 交换机名称
exchange="xmzoomeye_alerts",
# 交换机类型
exchange_type="topic",
# 如果同名交换机已存在依然返回成功,否则创建
passive=False,
# 声明为持久化交换机
durable=True,
# 交换机闲置也不会自动删除
auto_delete=False
)
# 创建info日志级别队列
channel.queue_declare(
# 队列名称
queue="info",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为持久化队列
durable=True,
# 声明为非私有队列
exclusive=False,
# 队列闲置也不会自动删除
auto_delete=False
)
# 创建warning日志级别队列
channel.queue_declare(
# 队列名称
queue="warning",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为持久化队列
durable=True,
# 声明为非私有队列
exclusive=False,
# 队列闲置也不会自动删除
auto_delete=False
)
# 创建problem日志级别队列
channel.queue_declare(
# 队列名称
queue="problem",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为持久化队列
durable=True,
# 声明为非私有队列
exclusive=False,
# 队列闲置也不会自动删除
auto_delete=False
)
# 创建cirtical日志级别队列
channel.queue_declare(
# 队列名称
queue="cirtical",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为持久化队列
durable=True,
# 声明为非私有队列
exclusive=False,
# 队列闲置也不会自动删除
auto_delete=False
)
# 创建rate_limit日志级别队列
channel.queue_declare(
# 队列名称
queue="cirtical_ratelimit",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为持久化队列
durable=True,
# 声明为非私有队列
exclusive=False,
# 队列闲置也不会自动删除
auto_delete=False
)
# 绑定队列
channel.queue_bind(
# 队列名称
queue="info",
# 交换机名称
exchange="xmzoomeye_alerts",
# 路由键名称
routing_key="info.*"
)
channel.queue_bind(
# 队列名称
queue="warning",
# 交换机名称
exchange="xmzoomeye_alerts",
# 路由键名称
routing_key="warning.*"
)
channel.queue_bind(
# 队列名称
queue="problem",
# 交换机名称
exchange="xmzoomeye_alerts",
# 路由键名称
routing_key="problem.*"
)
channel.queue_bind(
# 队列名称
queue="cirtical",
# 交换机名称
exchange="xmzoomeye_alerts",
# 路由键名称
routing_key="cirtical.*"
)
channel.queue_bind(
# 队列名称
queue="cirtical_ratelimit",
# 交换机名称
exchange="xmzoomeye_alerts",
# 路由键名称
routing_key="*.ratelimit"
)
def callback_wrapper(callback):
def wrapper(channel, method, header, body):
print ‘#{0}[{1}]>: {2}‘.format(method.consumer_tag, method.delivery_tag, body),
if header.content_type != ‘application/json‘:
print ‘with wrong content_type(application/json)‘
channel.basic_ack(delivery_tag=method.delivery_tag)
return
print ‘with correct content_type(application/json)‘
callback(channel, method, header, body)
# 发送消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
return wrapper
@callback_wrapper
def info_callback_handler(channel, method, header, body):
"""
channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
header : AMQP消息头信息,携带可选的消息元数据,如数据类型
body : 实际消息内容
"""
pass
return
@callback_wrapper
def warning_callback_handler(channel, method, header, body):
pass
return
@callback_wrapper
def problem_callback_handler(channel, method, header, body):
pass
return
@callback_wrapper
def cirtical_callback_handler(channel, method, header, body):
pass
return
@callback_wrapper
def cirtical_ratelimit_callback_handler(channel, method, header, body):
pass
return
# 作为指定队列消费者
channel.basic_consume(
info_callback_handler,
queue="info",
# 必须确认后再接收后续消息
no_ack=False,
consumer_tag="xmzoomeye_alerts_info"
)
channel.basic_consume(
warning_callback_handler,
queue="warning",
no_ack=False,
consumer_tag="xmzoomeye_alerts_warning"
)
channel.basic_consume(
problem_callback_handler,
queue="problem",
no_ack=False,
consumer_tag="xmzoomeye_alerts_problem"
channel.basic_consume(
cirtical_callback_handler,
queue="cirtical",
no_ack=False,
consumer_tag="xmzoomeye_alerts_cirtical"
)
channel.basic_consume(
cirtical_ratelimit_callback_handler,
queue="cirtical_ratelimit",
no_ack=False,
consumer_tag="xmzoomeye_alerts_cirtical_ratelimit"
)
# 循环调用回调函数接收处理消息
channel.start_consuming()
channel.close()> 生产者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() exchange = sys.argv[1] routekey = sys.argv[2] messages = sys.argv[3] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘application/json‘ # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=messages, # 发布到交换机 exchange=exchange, # 发布信息属性 properties=msg_props, # 发布信息时携带的路由键 routing_key=routekey )
说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py xmzoomeye_alerts cirtical.ratelimit ‘{"error": "cirtical rate limit"}‘,查看消费者端输出
扩展: 对于发后即忘的消息通信模式可轻而易举的扩展,如添加一个额外的队列绑定路由键*.*,将所有级别的告警记录都记录到数据库中,以便后期分析/汇总/压缩/分类/查询等操作~
2. 批量,针对大型数据集合的工作或者转换,这种类型的任务可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作.
> 消费者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
# 创建凭证对象
credentials = pika.PlainCredentials(‘guest‘, ‘guest‘)
# 创建参数对象
conn_params = pika.ConnectionParameters(
# RabbitMQ服务地址
host=‘127.0.0.1‘,
# RabbitMQ服务端口
port=5672,
# RabbitMQ服务凭证
credentials=credentials,
# RabbitMQ虚拟主机
virtual_host=‘/‘
)
# 创建连接对象
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道对象
channel = conn_broker.channel()
# 创建交换机
channel.exchange_declare(
# 交换机名称
exchange="salt-exchange",
# 交换机类型
type="direct",
# 如果同名交换机已存在依然返回成功
passive=False,
# 声明为持久化交换机
durable=False,
# 交换机闲置也不会自动删除
auto_delete=False
)
# 创建队列
channel.queue_declare(queue="salt")
# 绑定队列
channel.queue_bind(
# 队列名称
queue="salt",
# 交换机名称
exchange="salt-exchange",
# 路由键名称
routing_key="salt"
)
# 消息回调处理函数
def msg_consumer(channel, method, header, body):
# 发送消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
# 退出监听循环
if body == ‘exit‘:
channel.basic_cancel(consumer_tag="salt-consumer")
channel.stop_consuming()
else:
print ‘found notice: recive queue message {0}‘.format(body)
return
# 作为指定队列消费者
channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
# 循环调用回调函数接收处理消息
channel.start_consuming()> 生产者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() exchange = sys.argv[1] messages = sys.argv[2] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘application/json‘ # 消息持久化 msg_props.delivery_mode = 2 # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=messages, # 发布到交换机 exchange=exchange, # 发布信息属性 properties=msg_props, # 扇形交换机本身不需要路由键,但参数个数限制,随意推荐大家直接写#匹配所有 routing_key="#" )
说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py upload_pictures ‘{"image_id": 1, "user_id": 1, "image_path": "/xm-workspace/xm-webs/www.pic.com/data/images/73197d57-46a9-4d19-a48f-a44e0ad5e493.jpg"}‘,查看消费者端输出
扩展: 如上在WEB页面上上传完图片后,希望对图片进行生成缩略图/奖励上传用户积分/分享通知朋友圈等等,这几个任务之间是没有相互依赖关系的,不需要等待对方的结果才能继续执行,所以可以并行执行,扩展起来也非常容易,直接添加一个对应的队列和消费者即可,如要记录上传图片日志记录需求~,当发现一个创建缩略图的消费者跟不上节奏,直接在同台或异台服务器上再跑一个创建缩略图的消费者即可,任务会自动轮询分配,这一切对于用户是无感知的~
3. RPC,针对大量RPC请求使用消息来发回应答,AMQP消息头里有一个reply_to字段,生产者JSON RPC-API生成随机零时队列名存储到预发布RPC调用消息的头部reply_to字段到指定队列,然后在随机队列上监听响应数据,消费者JSON RPC-SRV接收到消息处理完毕后读取回调中header的reply_to字段,然后将响应发回零时队列,由于所有没有绑定交换机的队列都会自动绑定到匿名交换机,所以必用申请额外的交换机直接使用匿名交换机,消息一旦被接收,零时队列会自动被删除.至此完成一次RPC调用
> 消费者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
# 创建凭证对象
credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
# 创建参数对象
conn_params = pika.ConnectionParameters(
# RabbitMQ服务地址
host=‘127.0.0.1‘,
# RabbitMQ服务端口
port=5672,
# RabbitMQ登录凭证
credentials=credentials,
# RabbitMQ虚拟主机
virtual_host=‘/‘
)
# 创建连接对象
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道对象
channel = conn_broker.channel()
# 创建RPC交换机
channel.exchange_declare(
# 交换机名称
exchange="rpc",
# 交换机类型
exchange_type="direct",
# 如果同名交换机已存在依然返回成功,否则创建
passive=False,
# 声明为持久化交换机
durable=True,
# 交换机闲置也不会自动删除
auto_delete=False
)
# 创建ping任务队列
channel.queue_declare(
# 队列名称
queue="ping",
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为非持久化队列
durable=False,
# 声明为私有队列
exclusive=True,
# 队列闲置会自动删除
auto_delete=True
)
# 绑定队列
channel.queue_bind(
# 队列名称
queue="ping",
# 交换机名称
exchange="rpc",
)
# 请求回调
def api_request_ping(channel, method, header, body):
"""
channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
header : AMQP消息头信息,携带可选的消息元数据,如数据类型
body : 实际消息内容
"""
print ‘#{0}[{1}]>: {2}‘.format(
# 由于此处的header是生产者的,所以可通过header.reply_to获取随机队列名
header.reply_to,
method.delivery_tag,
body,
)
# 发送消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
# 发送响应对象
channel.basic_publish(
body="pong",
# 由于所有队列默认都会绑定到匿名交换机,非常方便直接发给它,它会根据传递过来的路由键原路返回
exchange="",
routing_key=header.reply_to
)
return
# 作为指定队列消费者
channel.basic_consume(
api_request_ping,
queue="ping",
# 必须确认后再接收后续消息
no_ack=False,
consumer_tag="ping_request"
)
# 循环调用回调函数接收处理消息
channel.start_consuming()
channel.close()> 生产者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
# 创建凭证对象
credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
# 创建参数对象
conn_params = pika.ConnectionParameters(
# RabbitMQ服务地址
host=‘127.0.0.1‘,
# RabbitMQ服务端口
port=5672,
# RabbitMQ登录凭证
credentials=credentials,
# RabbitMQ虚拟主机
virtual_host=‘/‘
)
# 创建连接对象
conn_broker = pika.BlockingConnection(conn_params)
# 获取信道对象
channel = conn_broker.channel()
# 创建匿名零时响应队列,返回唯一队列名称通过header的reply_to给消费者
queue = channel.queue_declare(
# 如果同名队列已存在依然返回成功,否则创建
passive=False,
# 声明为非持久化队列
durable=False,
# 声明为私有队列
exclusive=True,
# 队列闲置会自动删除
auto_delete=True
)
exchange = sys.argv[1]
messages = sys.argv[2]
# 创建配置对象
msg_props = pika.BasicProperties()
# 设置内容类型
msg_props.content_type = ‘application/json‘
# 将唯一响应队列名传递给消费者
msg_props.reply_to = queue.method.queue
# 发送消息
channel.basic_publish(
# 发布消息内容
body=messages,
# 发布到交换机
exchange=exchange,
# 发布信息属性
properties=msg_props,
# 消息路由键
routing_key="ping"
)
# 响应回调
def api_response_ping(channel, method, header, body):
"""
channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
header : AMQP消息头信息,携带可选的消息元数据,如数据类型
body : 实际消息内容
"""
print ‘#{0}[{1}]>: {2}‘.format(
# 由于此处返回的是消费者的header,所以不能使用header.reply_to而应该使用生成的随机唯一队列名
queue.method.queue,
method.delivery_tag,
body,
)
# 发送消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
return
# 作为指定队列消费者
channel.basic_consume(
api_response_ping,
# 从匿名零时队列中收取消息
queue=queue.method.queue,
# 必须确认后再接收后续消息
no_ack=False,
consumer_tag="ping_response"
)
# 循环调用回调函数接收处理消息
channel.start_consuming()
channel.close()说明: 测试非常简单,首先启动消费者python api_server.py,然后尝试执行生产者python rpc_client.py rpc ‘{"exec": "ping"}‘,查看消费者端输出
扩展: 可以轻易的通过创建队列和绑定的方式来扩展API以支持新的API方法,这样做的最大好处是任何一台服务器都无需对所有的API调用做应答,其它RPC服务器部署在同台或异台物理机器,而这一切对于用户是无感知的~
本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1878573
消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?
原文:http://xmdevops.blog.51cto.com/11144840/1878573