首页 > 编程语言 > 详细

python模拟发送、消费kafka消息

时间:2021-08-22 14:32:33      阅读:30      评论:0      收藏:0      [点我收藏+]

参考文章: https://zhuanlan.zhihu.com/p/279784873

生产者代码:

import traceback

from kafka import KafkaProducer,KafkaConsumer
from faker import Faker

fake=Faker()
# 生产者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = [xx:9092,xx:9092,xxx:9092]
# 消费者
kafka_topic_group = "test-group-zeze" #消费者群组
def producer(num:int):
    producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
    phones = [fake.name()+"-"+str(i) for i in range(num)]
    for p in phones:
        msg = bytes(p, encoding=utf-8)
        # print("生成消息",msg)

      #同一个key的消息会被自动分配到同一个分区 future=producer.send(kafka_topic, key=b"test",value=msg)
      #加了监听事件是否成功发送后,执行速度很慢,所以这里去掉了
# try: # future.get(timeout=2) # except Exception as e: # traceback.print_stack() print("成功生产{}条消息".format(num)) producer.close()

消费者代码:

from kafka import KafkaConsumer


# 生产者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = [xx:9092,xx:9092,xx:9092] #xx为对应的ip地址
# 消费者
kafka_topic_group = "test-group-zeze" #消费者群组


def consumer():
    consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)

    for message in consumer:
        print ("开始消费:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    # consumer.close()

亲测单机生产10w消息耗时20秒内,单线程

技术分享图片

 

消费者没记录耗时,但是也非常快,kafka性能确实牛 

python模拟发送、消费kafka消息

原文:https://www.cnblogs.com/qtclm/p/15171594.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!