Python支持多线程,但是由于GIL的限制并不能无限制的开启子线程。
通过semaphore我们可以控制子线程对于共享资源的访问,即可以阻塞一些子线程直到有空余的semaphore资源,但是并不能实际限制子线程数量。
Python2中不能用方便的concurrent.futures,可以使用队列/池的思想来处理这个问题;
根据个人经验,当开启的线程数超过100个后触发问题的概率就会变高(实际上限还受单个子线程工作量的影响)。
为此我需要总结一套通用的多线程限制方式:
if __name__ == ‘__main__‘: resources= get_all_resources() thread_pool = [] i = 0 while i < len(resources): if len(thread_pool) < 50: thread_pool.append(Thread(target=handle_resource, args=(resources[i],), name=resources[i])) i += 1 for t in thread_pool: if not t.daemon: t.setDaemon(True) t.start() elif not t.isAlive(): thread_pool.remove(t) else: pass if len(thread_pool) >= 50: # print "Thread pool size: %d, sleep 3s..." % len(thread_pool) time.sleep(3) continue for t in thread_pool: t.join()
上述Demo中,将线程池大小设置为50。
当线程池未满,直接创建新的子线程,然后在线程池中把未启动的启动,把执行完毕的剔除。
当线程池已满,便等待3秒,之后继续检查线程池看看能不能空出坑位。
当所有resources遍历完毕后,说明此时至多只有50个线程还需要执行,此时join()等待所有子线程执行完毕后结束主线程。
一些错误和总结:
一、子线程使用http request访问API,当线程数多于50个时偶尔会出现:
ConnectionError: HTTPConnectionPool(host=‘...‘, port=80): Max retries exceeded with url: ... (Caused by NewConnectionError(‘<urllib3.connection.HTTPConnection object at 0x0E1CC870>: Failed to establish a new connection: [Errno 10048] ...‘,))
即便在请求header中添加:Connection: close也是无济于事,通过限制线程池大小可以解决。
二、if len(thread_pool) <50/>=50的判断条件为何不能使用if...else写在一起?而是要分开。
假设if else写在的一起:
如果把中间的for循环放在if else之后,那么会出现线程永远不启动的问题。
如果把中间的for循环放在if else之前,那么会出现最后一批子线程无法启动的问题。
三、是否有现成的标准库或第三方库实现上述功能?
最好的是使用python3的标准库concurrent.futures,一个future对象其实就是一个子线程,通过其线程池功能,我们可以像使用threading模块那样使用concurrent.futures,只是方法名和使用方式有些许差异。并且threading模块里的Lock和信号量等同步原语也可以直接在concurrent.futures的代码中照常使用。
另外python2中也有threadpool这个第三方库,但是现在已经很少用了,连官网都404了。
四、为什么不直接使用multiprocessing的池功能?
multiprocessing比较适合并行的执行几个cpu-bound的大任务,而实际上使用并行时通常都是一些短促的任务,例如api访问,文件读写等。这种时候使用threading要比multiprocessing快得多。所以平时还是使用threading模块和concurrent.futures的场景较多。
原文:https://www.cnblogs.com/leohahah/p/14955947.html