import threading
import time
from queue import Queue
class Producer(threading.Thread):
def __init__(self, ip_queue, conn_queue, name=None):
super().__init__()
if name is not None:
self.name = name
self.ip_queue = ip_queue
self.conn_queue = conn_queue
def push_ip(self, ip):
self.ip_queue.put(ip)
def gen_conn_obj(self, ip):
conn = ip
self.conn_queue.put(conn)
print(‘生成 %s 成功‘ % conn)
def push_conn(self, conn):
self.conn_queue.put(conn)
def run(self) -> None:
while True:
try:
ip = self.ip_queue.get_nowait()
self.gen_conn_obj(ip)
self.ip_queue.task_done()
if self.ip_queue.qsize() == 0:
print(‘produce全部结束‘)
break
except Exception as e:
print(e)
break
class Consumer(threading.Thread):
def __init__(self, conn_queue, name=None):
super().__init__()
if name is not None:
self.name = name
self.conn_queue = conn_queue
def run(self):
while True:
try:
res = self.conn_queue.get_nowait()
print(‘消费 %s 成功‘ % res)
self.conn_queue.task_done()
if self.conn_queue.qsize() == 0:
print(‘consumer全部结束‘)
break
except Exception as e:
print(e)
break
def main():
# N 生产者数量
# M 消费者数量
N = 3
M = 2
# 任务队列
task_queue = Queue()
# 处理后任务队列
res_queue = Queue()
# 初始化任务队列
for i in range(1, 5):
task_queue.put(i)
# 创建N个生产者(每个生产者1个线程)
producers = []
for i in range(N):
p = Producer(ip_queue=task_queue, conn_queue=res_queue, name=‘ 生产者{:02d}‘.format(i + 1))
producers.append(p)
p.start()
# p.join()
# 创建M个消费者(每个消费者1个线程)
consumers = []
for i in range(M):
c = Consumer(conn_queue=res_queue, name=‘ 消费者{:02d}‘.format(i + 1))
consumers.append(c)
c.start()
# c.join()
# 阻塞线程,等待任务完成
task_queue.join()
res_queue.join()
print(‘全部结束‘)
# 当任务完成后,主线程退出,子线程也随之退出
if __name__ == ‘__main__‘:
print(‘start‘)
main()
print(‘end‘)
原文:https://www.cnblogs.com/liuhuan086/p/13360771.html