在使用multiprocessing库实现多进程前,了解一下操作系统的相关知识:
Unix/Linux实现多进程
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但fork()调用一次,返回两次,因为操作系统自动把当前父进程复制了一份子进程,然后分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的ID。这样,一个父进程可以fork出很多子进程,父进程记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程ID。
python的os模块封装了常见的系统调用,其中包括fork,可以在python程序中轻松创建子进程。
Windows实现多进程:
由于windows中没有fork调用,而如果我们需要在Windows上用python编写多进程的程序,就需要使用multiprocessing模块。
由于GIL(全局解释锁)的问题,python多线程并不能充分利用多核处理器。如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
multiprocessing可以给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。与threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
Process类可以创建新的子进程对象
p = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
参数:
group:分组,实际上很少用到
target:表示调用对象,即子进程要执行的任务,可以传入方法名
name:子进程名称
args:表示被调用对象的位置参数元组
kwargs:表示调用对象的字典
实例方法:
实例属性:
注意:
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱的问题,添加锁后,先获得锁的进程会阻塞后面的进程
l = multiprocessing.Lock():实例化一个锁对象
l.acquire():获得钥匙,此时数据只有当前进程可操作
l.release():返还要是,此时下一个先获得钥匙的进程可操作数据
上述的过程也可以使用关键字with加锁对象,将加锁的代码写入with代码块,如:
with i:
pass
# 等同于
l.acquire()
pass
l.release()
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个进程可以进行修改,即串行的修改,速度降低了但保证了数据安全
虽然可以用文件共享数据来实现进程之间的通信,但问题是:
效率低(共享数据基于文件,而文件是硬盘上的数据)
需要自己加锁处理
因此我们另寻一种方法解决这个问题,即multiprocessing模块所提供的基于消息的IPC通信机制:队列和管道
特点:高效且合适的处理好锁的问题
队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题解脱出来,尽量避免使用共享数据,尽可能的使用消息传递和队列,避免处理复杂的同步和锁的问题,而且在进程数目增多时,往往可以获得更好的可扩展性
互斥锁同时允许一个线程更改数据,而信号量是同时允许指定数量的线程更改数据
Semaphore相当于N把锁,获取其中一把即可。信号量的总数N在构造时传入,如果信号量为0,则进程堵塞,直到信号大于0,主要用来控制对共享资源的访问数量(进程池的最大连接数量)
信号量同步是基于一个内部的计数器,每调用一次acquire(),计数器减1,每调用一次release(),计数器加1,计数器最大为构造时传入的N,最小为0(此时acquire()调用会被堵塞)
信号量同步机制适用于访问像服务器、文件这样的有限资源
s = multiprocessing.Semaphore(N) # 创建信号量对象,锁池中有N个锁 s.acquire() # 取出一个锁,计数器-1 s.release() # 释放一个锁,计数器+1
线程事件用于主线程控制其他线程的执行,事件对象主要提供了三个方法set()、clear()、wait()、is_set()
事件的处理机制:
全局定义了一个变量,变量值默认为False,wait()方法起阻塞的作用,当变量值设置为True,wait()方法不再阻塞
e = multiprocessing.Event():构建事件对象
e.is_set():查看默认为False的变量
e.set():将变量设置为True
e.clear():将变量设置为False
e.wait([timeout]):若变量为False,此方法会使进程堵塞,若变量为True,此方法不会使进程堵塞,若设置参数timeout,等待指定时间后将不再阻塞
共享的多进程进程安全队列,可以实现多进程的数据传递
队列的底层由管道和锁构成
q = Queue(maxsize):创建队列对象,参数maxsize是队列中允许的最大项数,如果省略此参数则大小无限制
q.put(item):将item加入队列,如果当前队列已满,将会阻塞,直到有数据从管道中取出为止
q.put_nowait(item):将item加入队列,如果当前队列已满,不会阻塞,但会抛出异常
q.get():返回放入队列中的一项数据,队列是先进先出,如果当前队列为空,就会阻塞,直到有数据进来
q.get_nowait():返回放入队列中的一项数据,如果当前队列为空,不会阻塞,但会抛出异常
q.empty():判断队列是否为空,为空返回True,不为空返回False,如果其他进程或者线程正在往队列中添加数据,结果往往不可靠。在返回和使用之间,队列中可能已经发生了变化
q.size():返回队列中的数据量,同上不可靠
q.full():判断队列是否已满,同上不可靠
继承于Queue,但队列允许项目的消费者通知生产者项目已经成功处理,通知进程是用共享的信号和条件变量来实现的。
j = JoinableQueue(maxsize)
实例方法与Queue的实例方法大致相同,除此之外还提供了以下方法:
j.task_done():消费者使用这个方法发出信号,表示j.get()返回的项已经被处理,如果调用此方法的次数大于从队列中删除的项目数量,将抛出异常
j.join():生产者使用这个方法阻塞,直到队列中所有项全被处理,阻塞将持续到为队列中的每个项均调用j.task_done()为止
生产者消费者模式:
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题,该模型通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。
生产者指的是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者的速度大于消费者,那么生产者就需要等待消费者处理完,才能继续生产数据,反之消费者就必须等待生产者。为了解决上述问题从而引出生产者消费者模式。
生产者消费者模式是通过一个额外的容器解决两者间的强耦合问题,生产者和消费者不再直接通信,而是通过强阻塞队列进行通讯,所以生产者生产数据后不用等待消费者处理,而是直接扔给阻塞队列,消费者也不再向生产者要数据,而是从队列中提取,阻塞队列起到了缓冲的作用,从一定程度上平衡了两者的处理能力。
from multiprocessing import Process, JoinableQueue def consumer(name, queue): while 1: weapon = queue.get() print("\033[31m%s购买了%s" % (name, weapon)) queue.task_done() # 记录已处理了多少个数据 def producer(name, queue): for i in range(20): weapon = "\033[36m%s生产的第%s件商品\033[0m" % (name, i + 1) queue.put(weapon) # 放入生产的商品 queue.join() # 每生产一个商品便会阻塞,等待购买 if __name__ == ‘__main__‘: q = JoinableQueue(10) # 队列最大容量10 pro_1 = Process(target=producer, args=("torbjorn", q)) pro_2 = Process(target=producer, args=("symmetra", q)) pro_3 = Process(target=producer, args=("orisa", q)) con_1 = Process(target=consumer, args=("mccree", q)) con_2 = Process(target=consumer, args=("tracer", q)) p_list = [pro_1, pro_2, pro_3, con_1, con_2] con_1.daemon = True # 设置成守护进程,父进程结束时,两个消费者子进程理应结束 con_2.daemon = True [p.start() for p in p_list] # 启动这些进程 [p.join() for p in p_list[:3]] # 最后的结果是mccree和tracer争抢三家生产的商品,三家生产者同时生产
p = Pipe():返回的是一个管道的两端的元组对象
p.send(item):向管道中发送数据
p.recv(item):接收管道中的数据,如果管道中无数据且另一端关闭后仍接收数据将抛出异常
p.close():关闭管道
进程间的数据是独立的,可以借助队列或者管道实现通信,但二者都属于消息传递,可以通过Manager完成进程间的数据共享
m = Manager()
m.dict():创建一个字典的共享数据,可在另外的进程中直接使用
m.list():创建一个列表的共享数据,可在另外的进程中直接使用
...
注意:
在创建一个Manager对象时,程序额外启动了一个阻塞的server服务,以此实现多进程间的数据安全,即正常情况下不同进程对同一数据的操作是互斥的,一个进程向server请求数据,再把这部分数据修改,返回给server,之后server再去处理其他进程的请求。但是如果没有将数据返回给server,又向server请求数据,就会发生数据混乱,可以通过锁解决问题
在实际问题中,待执行的任务量可能是十分巨大的,不可能为每一个任务都去创建一个进程(创建进程和销毁进程是有开销的,操作系统也不会允许这么多进程的执行),这时就可以引入进程池的概念
进程池即一个容器,在里面放上固定数量的进程,有任务要处理时从池中取一个进程,等任务处理完毕后,进程并不会销毁,而是重新放回进程池中等待新的任务。如果任务量大时,任务会等待至进有程执空闲为止。进程池中的进程数量固定,那么不会出现上述问题,也节省了开闭进程的时间开销,一定程度上实现并发效果
p = Pool([numprocess [, initializer [, initargs]]]):创建进程池
参数:
实例方法:
apply_async和map_async方法的返回值是AsyncResult的实例对象,有以下方法:
apply_async()方法的注意事项:
进程池比手动开启多个进程效率要高(省去了销毁、调度的开销)
异步比同步效率要高
原文:https://www.cnblogs.com/eat-w/p/12078785.html