线程
multiprocessing模块
Process( target=函数名,args=(参数,) )
对象.start()
就开启了一个进程 → 相当于给了操作系统一条指令if __name__ == ‘__main__‘
下。if __name__ == ‘__main__‘
下。对象.join
有一个参数可以将一个子进程设置为一个守护进程
# 守护进程随着主进程的代码结束而结束
import time
from multiprocessing import Process
def son1():
while True:
print(‘is alive‘)
time.sleep(0.5)
if __name__ == ‘__main__‘:
p = Process(target=son1)
p.daemon = True # 把子进程设置成了守护进程
p.start()
time.sleep(2)
import time
from multiprocessing import Process
def son1():
while True:
print(‘is alive‘)
time.sleep(0.5)
def son2():
for i in range(5):
print(‘in son2‘)
time.sleep(1)
if __name__ == ‘__main__‘:
p1 = Process(target=son1)
p1.daemon = True
p.start()
p2 = Process(target=son2)
p2.start()
time.sleep(2)
import time
from multiprocessing import Process
def son1():
while True:
print(‘is alive‘)
time.sleep(0.5)
if __name__ == ‘__main__‘:
p = Process(target=son1)
p.start() # 异步非阻塞
print(p.is_alive()) # 判断子进程是否处于运行中
time.sleep(1)
p.terminate() # 异步非阻塞:强制结束一个子进程
print(p.is_alive()) # True,因为操作系统还没来得及关闭
time.sleep(0.01)
print(p.is_alive()) # False,操作系统已经响应了关闭进程的需求
什么是异步非阻塞?
import os
from multiprocessing import Process
class MyProcess(Process): # 必须继承Process类
def run(self): # 必须有run方法
print(os.getpid(), os.getppid())
if __name__ == ‘__main__‘:
mp = MyProcess()
mp.start()
print(‘main:‘, os.getpid())
import time
from multiprocessing import Process
class MyProcess1(Process):
def run(self):
for i in range(5):
print(‘子进程要执行的代码封装进run方法中‘)
class MyProcess2(Process):
def run(self):
while True:
print(‘自定义类中必须有run方法‘)
time.sleep(0.2)
if __name__ == ‘__main__‘:
mp1 = MyProcess1()
mp1.start()
mp2 = MyProcess2()
mp2.daemon = True
mp2.start()
print(‘自定义类必须继承Process类‘)
time.sleep(1)
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, x, y):
self.x = x
self.y = y
super().__init__()
def run(self):
print(self.x, self.y)
for i in range(5):
print(‘666‘)
time.sleep(0.2)
if __name__ == ‘__main__‘:
mp = MyProcess(1, 2)
mp.start()
print(‘参数传递‘)
import os
from multiprocessing import Process
def func(arg):
print(arg)
class MyProcess(Process):
def __init__(self, target, args=()):
super().__init__(target=target, args=args)
if __name__ == ‘__main__‘:
mp = MyProcess(func, (‘必须是元组‘,))
mp.start()
print(‘面向对象的方式开启子进程‘)
Process类
开启进程的方式
面向函数
def 函数名():
‘要在子进程中执行的代码‘
p = Process(target=函数名, args=(参数,))
p.start()
面向对象
class 类名(Process):
def __init__(self, 参数1, 参数2): ‘如果子进程不需要参数则不写‘
self.a = 参数1
self.b = 参数2
super().__init__()
def run(self):
‘要在子进程中执行的代码‘
# 创建进程对象
p = 类名(参数1, 参数2)
# Process类提供的操作进程的方法
p.start() ‘开启子进程,异步非阻塞‘
p.terminate() ‘结束子进程,异步非阻塞‘
p.join() ‘等子进程结束,同步阻塞‘
p.isalive() ‘获取当前子进程的状态‘
daemon = True ‘设置为守护进程,永远在主进程的代码执行结束之后自动结束‘
在一个并发的场景下,涉及修改共享数据资源时,则需要加锁来维护数据的安全。
import time
import json
from multiprocessing import Process, Lock # 导入Lock类
def search_ticket(user):
with open(‘test‘, ‘r‘, encoding=‘utf-8‘) as f:
dic = json.load(f)
print(‘%s查询结果: %s张余票‘ % (user, dic[‘count‘]))
def buy_ticket(user, lock):
‘with lock:‘ # with语句加锁
lock.acquire() # 给这段代码加锁
time.sleep(0.02)
with open(‘test‘) as f:
dic = json.load(f)
if dic[‘count‘] > 0:
print(‘%s买到票‘ % (user,))
dic[‘count‘] -= 1
else:
print(‘%s没买到票‘ % (user,))
time.sleep(0.02)
with open(‘test‘, ‘w‘) as f:
json.dump(dic, f)
lock.release() # 给这段代码解锁
def task(user, lock):
search_ticket(user)
buy_ticket(user, lock)
if __name__ == ‘__main__‘:
lock = Lock()
lst = []
for i in range(1, 11):
p = Process(target=task, args=(‘user%s‘ % i, lock))
p.start()
lst.append(p)
for i in lst:
i.join()
with open(‘test‘, ‘w‘) as f:
dic = {"count": 1}
json.dump(dic, f)
import time
import json
from multiprocessing import Process, Lock
def search_ticket(user):
with open(‘test‘, ‘r‘, encoding=‘utf-8‘) as f:
dic = json.load(f)
print(‘%s查询结果: %s张余票‘ % (user, dic[‘count‘]))
def buy_ticket(user):
time.sleep(0.02)
with open(‘test‘) as f:
dic = json.load(f)
if dic[‘count‘] > 0:
print(‘%s买到票‘ % (user,))
dic[‘count‘] -= 1
else:
print(‘%s没买到票‘ % (user,))
time.sleep(0.02)
with open(‘test‘, ‘w‘) as f:
json.dump(dic, f)
def task(user, lock):
search_ticket(user)
with lock: # 推荐with语句加锁,自带异常处理功能。
buy_ticket(user)
if __name__ == ‘__main__‘:
lock = Lock()
lst = []
for i in range(1, 11):
p = Process(target=task, args=(‘user%s‘ % i, lock))
p.start()
lst.append(p)
for i in lst:
i.join()
with open(‘test‘, ‘w‘) as f:
dic = {"count": 1}
json.dump(dic, f)
同步存在的意义
总结
lock = Lock()
with lock
with lock
相当于lock.acquire()
和lock.release()
multiprocessing模块
中导入Queue
实现Queue
基于socket模块
、pickle模块
、Lock模块
实现Pipe
基于socket模块
、pickle模块
实现# 进程之间的数据隔离
from multiprocessing import Process
n = 100
def func():
global n
n -= 1
print(n)
if __name__ == ‘__main__‘:
lst = []
for i in range(3):
p = Process(target=func)
p.start()
lst.append(p)
for p in lst:
p.join()
print(‘main‘,n)
结果:
99
99
99
main 100
from multiprocessing import Queue, Process
def func(exp, q):
ret = eval(exp)
q.put(ret) # 储存结果,先进先出。
if __name__ == ‘__main__‘:
q = Queue() # 创建对象
Process(target=func,args=(‘1+2+3‘, q)).start() # 传入参数
print(q.get()) # 获取结果
# Queue内部基于文件家族socket、pickle、Lock实现。
from multiprocessing import Queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 在队列为空的时候会发生阻塞
from multiprocessing import Queue
q = Queue(2) # 设置队列大小
q.put(1)
q.put(2)
print(‘显示‘)
q.put(3) # 当队列为满的时候向队列中放数据,会阻塞:不报错且不丢失数据。
print(‘不显示‘)
import queue
from multiprocessing import Queue
q = Queue(2)
q.put(1)
q.put(2)
print(‘显示‘)
try:
q.put_nowait(3) # 当队列为满的时候用该方法放数据,不阻塞:系统会报错且会丢失数据。
except queue.Full:
pass
print(‘不显示‘)
print(q.get())
print(q.get())
print(q.get()) # 阻塞
import queue
from multiprocessing import Queue
q = Queue(2)
q.put(1)
q.put(2)
print(‘显示‘)
try:
q.put_nowait(3)
except queue.Full:
pass
print(‘不显示‘)
print(q.get())
print(q.get())
try:
print(q.get_nowait()) # 队列为空的时候,不阻塞:系统会报错。
except queue.Empty:
pass
原文:https://www.cnblogs.com/elliottwave/p/12656168.html