首页 > 其他 > 详细

RabbitMQ队列

时间:2019-04-03 19:34:03      阅读:157      评论:0      收藏:0      [点我收藏+]

1.队列的作用:

    - 应用解耦
    
      项目中的应用:
         登录注册时候,使用队列进行解耦,将原本串行的架构改为异步
    
    - 流量削峰
        QPS: 每秒访问的次数
        DAU: 日活跃用户数
        MAU: 月活跃用户数
        总用户数
        (1). 使用动态扩容,抗住并发请求流量
        (2). 秒杀活动,,抢票活动, 微博星轨

2.如何保证数据不丢失

    1. 在队列里,设置durable=true  代表队列持久化
        
    2. 在生产者端,设置
        properties = pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        )
    
    3. 在消费者端
        auto_ack = False #不用自动检测是否被消费
        ch.basic_ack(delivery_tag=method.delivery_tag)

3.exchange模式

    fanout : 广播
    direct : 组播
    topic  : 规则播

普通模式:

技术分享图片
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))

channel = connection.channel()

### 声明队列
channel.queue_declare(queue=test, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=test,
                      body=Hello World!,
                        properties = pika.BasicProperties(
                            delivery_mode=2,  # make message persistent
                        )
                      )

print(" [x] Sent ‘Hello World!‘")

connection.close()
producer
技术分享图片
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))

channel = connection.channel()

channel.queue_declare(queue=test, durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue=test, on_message_callback=callback, auto_ack=False)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
consumer

fanout模式:

技术分享图片
import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))

channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         exchange_type=fanout)

message =  .join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange=logs, routing_key=‘‘, body=message)

print(" [x] Sent %r" % message)

connection.close()
producer
技术分享图片
import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))

channel = connection.channel()

channel.exchange_declare(exchange=logs, exchange_type=fanout)

result = channel.queue_declare(‘‘, exclusive=True) ### exclusive 排他的 唯一的

queue_name = result.method.queue
print("queue_name:", queue_name)

channel.queue_bind(exchange=logs, queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
consumer

direct模式:

技术分享图片
import sys
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=direct_logs, exchange_type=direct)

log_levels = sys.argv[1] if len(sys.argv) > 1 else info

message =  .join(sys.argv[2:]) or Hello World!

channel.basic_publish(
    exchange=direct_logs, routing_key=log_levels, body=message)
print(" [x] Sent %r:%r" % (log_levels, message))
connection.close()
producer
技术分享图片
import sys
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=direct_logs, exchange_type=direct)

result = channel.queue_declare(‘‘, exclusive=True)
queue_name = result.method.queue

log_levels = sys.argv[1:]
if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in log_levels:
    channel.queue_bind(
        exchange=direct_logs, queue=queue_name, routing_key=severity)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
consumer 

topic模式:

技术分享图片
import sys
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=topic_logs, exchange_type=topic)

routing_key = sys.argv[1] if len(sys.argv) > 2 else anonymous.info
message =  .join(sys.argv[2:]) or Hello World!

channel.basic_publish(
    exchange=topic_logs, routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
producer
技术分享图片
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=topic_logs, exchange_type=topic)

result = channel.queue_declare(‘‘, exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange=topic_logs, queue=queue_name, routing_key=binding_key)

print( [*] Waiting for logs. To exit press CTRL+C)


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
consumer

 

RabbitMQ队列

原文:https://www.cnblogs.com/xuechengeng/p/10650923.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!