前言
虽然CPython因为GIL的存在导致多线程的并发计算能力大打折扣,但是在i/o密集型的场景时,使用多线程还是能带来效率上的飞跃。近期在使用多线程时遇到了主线程无法捕获子线程抛出的异常问题,这里记录一下解决的办法。
需求
将某一指定目录下所有的文件(包含子目录下的文件)中所有被$[]$
字符包裹起来的变量替换成指定的值,这是一个典型的io密集的场景,因此考虑使用多线程提升效率
原demo
1 | def main(): |
demo内直接使用python3.2版本以后引入的ThreadPoolExecutor库使用多线程,在子线程无异常时是正常运行的,但是在子线程出现异常时(比如子线程的内部逻辑里发现了不存在key时, except KeyError会捕获到异常),但是你会发现子线程异常终止了,主线程但是却没有异常抛出。
经过一番搜索,在python官方手册中了解到了原因:
参考官方文档链接:
https://docs.python.org/zh-cn/3/library/_thread.html
原因即为:
使用start()方法启动子线程时,解释器会为子线程开辟独立的栈空间,主线程自然就无法获取子线程栈的信息。当线程异常中止时,会自行退出而不会将此异常raise到主线程。那么得知了原因,就可以找到解决的办法了。思路是继承标准库的Thread类,进行一些小的改写封装。
修改后的demo
1 | class ReplaceThread(Thread): |
如此这般,在主线程里通过自定义的子线程返回值来判断子线程是否有异常,如果子线程有异常则主线程接替抛出子线程里的异常。这里另外还要注意,子线程的join()操作要放到start()操作全部完成了之后再进行,避免主线程被子线程阻塞,这样就变成了串行执行多线程就失去了意义了。
问题
这里是以迭代对象来循环启动多线程的,假设迭代对象数量很长,那就会启动成百上千个线程,这是不愿意看到的,为了避免这种情况,可自定义线程池,仅需对上方demo中的main()方法再做一点小改动即可。
实现
1 | def main(): |
复杂度优化:
上方的方法是对迭代的对象列表进行切片,每次只截取前10个对象,执行完这10个对象的操作后再截取随后后的10个对象,直到迭代对象为空。这种方式使用list的切片,时间复杂度为O(k),k为截取长度。
有没有更好的方式?这里列举一种复杂度更低的方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25def main():
conf_files=['/etc/abc/', '/var/abc']
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
pool_size = 10
files = self.conf_files
file_num = len(files)
while files:
if file_num < pool_size:
pool_size = file_num
t_objs = []
for i in range(pool_size):
file = files.pop()
t = ReplaceThread(file=file, conf_map=conf_map)
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
if t.exitcode != 0:
os.system("rm -rf {}".format(tmp_conf_path))
raise Exception(t.exception)
file_num -= pool_size
说明:
利用了顺序表(list)的 尾部操作/获取长度操作 时间复杂度均为O(1)的特性,每次操作list的尾部元素,这个方式的复杂度更低
总结
子线程异常处理问题由此就得以解决,通过一些小改动也可以实现自定义的低复杂度线程池。重要的事情只说一遍:遇到问题查官方文档