Python子线程异常捕获 & Python自定义优化线程池

前言

虽然CPython因为GIL的存在导致多线程的并发计算能力大打折扣,但是在i/o密集型的场景时,使用多线程还是能带来效率上的飞跃。近期在使用多线程时遇到了主线程无法捕获子线程抛出的异常问题,这里记录一下解决的办法。

需求
将某一指定目录下所有的文件(包含子目录下的文件)中所有被$[]$字符包裹起来的变量替换成指定的值,这是一个典型的io密集的场景,因此考虑使用多线程提升效率

原demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def main():
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
conf_files=['/etc/abc/', '/var/abc']
thpool = ThreadPoolExecutor(5)
for file in conf_files:
thpool.submit(replace_config, file, conf_map)
thpool.shutdown(wait=True)

def replace_config(file, conf_map, tmp_conf_path):
with open(file, 'r') as f:
content = f.read()

# 需替换的变量的样式为: $[PASSWORD]$
wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
var_list = wrapper_pattern.findall(content)

for var in var_list:
try:
value = conf_map[var]
wrapper = "$[%s]$" % var
content = content.replace(wrapper, value)
except KeyError:
print('key error')
os.system("mv {} /tmp".format(tmp_conf_path))
raise Exception('[%s]文件中存在未知的key: %s' % (file, var))

with open(file, 'w') as f:
f.write(content)

demo内直接使用python3.2版本以后引入的ThreadPoolExecutor库使用多线程,在子线程无异常时是正常运行的,但是在子线程出现异常时(比如子线程的内部逻辑里发现了不存在key时, except KeyError会捕获到异常),但是你会发现子线程异常终止了,主线程但是却没有异常抛出。

经过一番搜索,在python官方手册中了解到了原因:
在这里插入图片描述

参考官方文档链接:
https://docs.python.org/zh-cn/3/library/_thread.html

原因即为:

使用start()方法启动子线程时,解释器会为子线程开辟独立的栈空间,主线程自然就无法获取子线程栈的信息。当线程异常中止时,会自行退出而不会将此异常raise到主线程。那么得知了原因,就可以找到解决的办法了。思路是继承标准库的Thread类,进行一些小的改写封装。

修改后的demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ReplaceThread(Thread):
def __init__(self, file, conf_map):
super(ReplaceThread, self).__init__()
self.file = file
self.conf_map = conf_map

self.exitcode = 0
self.exception = None

def run(self):
try:
self._run()
except Exception as e:
# 如果捕获到异常,返回值改为不等于0
self.exitcode = 1
self.exception = e

def _run(self):
with open(self.file, 'r') as f:
content = f.read()

# 需替换的变量的样式为: $[PASSWORD]$
wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
var_list = wrapper_pattern.findall(content)

for var in var_list:
try:
value = self.conf_map[var]
wrapper = "$[%s]$" % var
content = content.replace(wrapper, value)
except KeyError:
raise Exception('[%s]文件中存在未知的key: %s‘ % (self.file, var))

with open(self.file, 'w') as f:
f.write(content)

# 主线程无法直接捕获子线程内的异常,因此自定义了Thread类,在子线程内定义其出现异常时的返回值,在主线程内根据返回值
# 来判断是否出现异常,并进行下一步操作
def main():
conf_files=['/etc/abc/', '/var/abc']
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
t_objs = []
for file in conf_files:
t = ReplaceThread(file=file, conf_map=conf_map)
t.start()
t_objs.append(t)

for t in t_objs:
t.join()
if t.exitcode != 0:
os.system("mv {} /tmp".format(tmp_conf_path))
raise Exception(t.exception)

如此这般,在主线程里通过自定义的子线程返回值来判断子线程是否有异常,如果子线程有异常则主线程接替抛出子线程里的异常。这里另外还要注意,子线程的join()操作要放到start()操作全部完成了之后再进行,避免主线程被子线程阻塞,这样就变成了串行执行多线程就失去了意义了。

问题
这里是以迭代对象来循环启动多线程的,假设迭代对象数量很长,那就会启动成百上千个线程,这是不愿意看到的,为了避免这种情况,可自定义线程池,仅需对上方demo中的main()方法再做一点小改动即可。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def main():
conf_files=['/etc/abc/', '/var/abc']
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}

# 实现的方法是对迭代的对象进行截取,每次只截取前10个对象,执行完这10个对象的操作后再截取随后后的10个对象,直到迭代对象为空。
n = 0
pool_size = 10
files = conf_files
file_num = len(files)
while files:
t_objs = []
start_index = n * pool_size
end_index = (n + 1) * pool_size

for i in range(pool_size):
current_index = start_index + i
if current_index < file_num:
file = self.conf_files[current_index]
t = ReplaceThread(file=file, conf_map=conf_map)
t.start()
t_objs.append(t)

for t in t_objs:
t.join()
if t.exitcode != 0:
os.system("rm -rf {}".format(base_conf_path))
raise Exception(t.exception)

n += 1
files = conf_files[end_index:]

复杂度优化:
上方的方法是对迭代的对象列表进行切片,每次只截取前10个对象,执行完这10个对象的操作后再截取随后后的10个对象,直到迭代对象为空。这种方式使用list的切片,时间复杂度为O(k),k为截取长度。

有没有更好的方式?这里列举一种复杂度更低的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main():
conf_files=['/etc/abc/', '/var/abc']
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}

pool_size = 10
files = self.conf_files
file_num = len(files)
while files:
if file_num < pool_size:
pool_size = file_num

t_objs = []
for i in range(pool_size):
file = files.pop()
t = ReplaceThread(file=file, conf_map=conf_map)
t.start()
t_objs.append(t)

for t in t_objs:
t.join()
if t.exitcode != 0:
os.system("rm -rf {}".format(tmp_conf_path))
raise Exception(t.exception)

file_num -= pool_size

说明:

利用了顺序表(list)的 尾部操作/获取长度操作 时间复杂度均为O(1)的特性,每次操作list的尾部元素,这个方式的复杂度更低

总结

子线程异常处理问题由此就得以解决,通过一些小改动也可以实现自定义的低复杂度线程池。重要的事情只说一遍:遇到问题查官方文档

赏一瓶快乐回宅水吧~
-------------本文结束感谢您的阅读-------------