一、什么是线程
线程,有时被称为轻量进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。
二、进程与线程
三、线程的特点
四、内存中的线程
五、全局解释器锁GIL
六、python线程模块
七、threading模块
创建线程,方法1 from threading import Thread import time def func(n): time.sleep(1) print(n) for i in range(10): Thread(target=func, args=(i,)).start() 创建线程,方法2 from threading import Thread import time class MyThread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print(self.arg) for i in range(10): MyThread(i).start()
#服务端: from threading import Thread import socket sk = socket.socket() s = (‘127.0.0.1‘, 8081) sk.bind(s) sk.listen() def func(conn): while 1: se = input(‘>>>‘) if se == ‘q‘: break conn.send(se.encode(‘utf-8‘)) ret = conn.recv(1024).decode(‘utf-8‘) print(ret) conn.close() while 1: conn,addr = sk.accept() Thread(target=func, args=(conn,)).start() #客户端 import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8081)) ret = sk.recv(1024).decode(‘utf-8‘) print(ret) c = input().encode(‘utf-8‘) sk.send(c) sk.close()
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
守护线程 #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。 from threading import Thread import time def sayhi(name): time.sleep(2) print(‘%s say hello‘ %name) if __name__ == ‘__main__‘: t=Thread(target=sayhi,args=(‘hsr‘,)) #t.setDaemon(True) #必须在t.start()之前设置 t.start() print(‘主线程‘) print(t.is_alive()) 注释t.setDaemon(True) 即不设置为守护线程,结果: 主线程 True hsr say hello 设置为守护线程,结果: 主线程 True
#当多个线程同时去拿同一个数据时,造成数据的不安全 import time from threading import Thread,Lock def eat(): global n temp = n time.sleep(1) n = temp - 1 n = 10 t_lst = [] for i in range(10): t = Thread(target=eat) t.start() t_lst.append(t) [t.join() for t in t_lst] print(n) #通过加锁保证安全性 import time from threading import Thread,Lock def eat(l): global n l.acquire() temp = n time.sleep(1) n = temp - 1 l.release() n = 10 t_lst = [] l = Lock() for i in range(10): t = Thread(target=eat,args=(l,)) t.start() t_lst.append(t) [t.join() for t in t_lst] print(n)
指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程 例子: import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def e1(name): noodle_lock.acquire() print(‘%s,拿到面条‘%name) time.sleep(1) fork_lock.acquire() print(‘%s,拿到叉子‘%name) print(‘nice‘%name) fork_lock.release() noodle_lock.release() def e2(name): fork_lock.acquire() print(‘%s,拿到叉子‘%name) time.sleep(1) noodle_lock.acquire() print(‘%s,拿到面条‘ % name) print(‘nice‘%name) noodle_lock.release() fork_lock.release() Thread(target=e1,args=(‘hsr‘,)).start() Thread(target=e1,args=(‘张三‘,)).start() Thread(target=e2,args=(‘李四‘,)).start() Thread(target=e2,args=(‘王五‘,)).start()
Lock ->互斥锁 RLock ->递归锁 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。 import time from threading import Thread,RLock lock = RLock() def e1(name): lock.acquire() print(‘%s,拿到面条‘%name) time.sleep(1) lock.acquire() print(‘%s,拿到叉子‘%name) print(‘%snice‘%name) lock.release() lock.release() def e2(name): lock.acquire() print(‘%s,拿到叉子‘%name) time.sleep(1) lock.acquire() print(‘%s,拿到面条‘ % name) print(‘%snice‘%name) lock.release() lock.release() Thread(target=e1,args=(‘hsr‘,)).start() Thread(target=e1,args=(‘张三‘,)).start() Thread(target=e2,args=(‘李四‘,)).start() Thread(target=e2,args=(‘王五‘,)).start()
from threading import Semaphore,Thread import time def func(a,b,s): time.sleep(1) s.acquire() print(a+b) #同一时间只能由4个线程执行这个代码 s.release() sem = Semaphore(4) for i in range(10): t = Thread(target=func,args=(i,i+5,sem)) t.start()
以模拟检测网络状况,连接数据库 from threading import Event,Thread import time,random def check_net(e): time.sleep(random.randint(0,5)) e.set() def connect_db(e): count = 0 while True: if count >2: raise TimeoutError e.wait(1) #状态为False,只等2s if e.is_set() == True: print(‘----connect----‘) break else: count += 1 print(‘error‘) e = Event() t1 = Thread(target=connect_db,args=(e,)) t2 = Thread(target=check_net,args=(e,)) t1.start() t2.start()
#一个条件被创建时,默认False状态 #False会让wait处于等待状态 #notify相当于制造几把钥匙 #使用钥匙不会归还 from threading import Condition,Thread import time,random def func(c,i): #wait要先acquire c.acquire() c.wait() print(‘在第%s个循环里‘%i) c.release() c = Condition() for i in range(10): t = Thread(target=func, args=(c,i)) t.start() while 1: num = int(input(‘>>>‘)) #执行notify要先acquire c.acquire() c.notify(num) c.release()
from threading import Timer def func(): print(‘时间同步‘) Timer(2,func).start() #两秒后打印“时间同步”
#3种队列 #queue普通队列 #queue.LifoQueue先进后出队列(栈) #queue.PriorityQueue优先级队列 queue.PriorityQueue() #优先级队列 q.put((20,1)) q.put((10,2)) q.put((30,4)) q.get()#10 q.get()#1 q.get()#4 #优先级一样按ascii码排
concurrent.futures #1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数 # done() 判断某一个线程是否完成 # cancle() 取消某个任务 3.例子: from concurrent.futures import ThreadPoolExecutor import time def func(n): time.sleep(2) print(n) return n*n tpool = ThreadPoolExecutor(max_workers=20) #不要超过cpu个数*5 t_lst = [] for i in range(50): t = tpool.submit(func,i) #提交任务 t_lst.append(t) tpool.shutdown() #close + join print(‘主线程‘) for t in t_lst:print(t.result()) #获取返回值 4.回调函数 from concurrent.futures import ThreadPoolExecutor import time def func(n): time.sleep(2) print(n) return n*n def call_back(m): print(‘result:%s‘%m.result()) tpool = ThreadPoolExecutor(max_workers=5) #不要超过cpu个数*5 for i in range(20): t = tpool.submit(func,i).add_done_callback(call_back) #提交任务
原文:https://www.cnblogs.com/walthwang/p/10427744.html