基于可杀死线程实现的timeout装饰器

下面的代码实现了3个有用的东西:

  • KillableThread 一个可kill的线程,并且可以通过join返回线程方法的数据,以及在join时把线程内的异常重新抛出到调用join的线程。
  • timeout装饰器 超时自动抛出TimeoutError异常。
  • retry装饰器 提供异常时自动重试,可以指定重试次数,0表示不重试,重试次数超过后会把最后一次的异常向外抛出。

func_utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
import threading
import time
import inspect
import ctypes
import traceback
import sys, os
from functools import wraps

class KillableThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return_value = None
self._exception = None

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

def kill(self):
KillableThread._async_raise(self.ident, SystemExit)

def run(self):
try:
self._return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self._exception = e

def join(self):
super().join()
if self._exception is not None:
raise self._exception
return self._return_value

def join(self, timeout):
super().join(timeout)
if self._exception is not None:
raise self._exception
return self._return_value


def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None


def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None


def _get_formated_frame_stack(tid):
info = []
th = _get_thread(tid)
stack = _get_frame_stack(tid)
info.append('%s thread_id=%d' % (th.name, tid))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def timeout(seconds):
"""
Decorator to execute a function with a specified timeout.

Args:
- seconds (int): The time limit in seconds for the function to complete.

Returns:
- function: The decorated function.

Raises:
- TimeoutError: If the function does not complete within the specified time limit.

Usage:
@timeout(seconds=10)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
th = KillableThread(target=func, args=args, kwargs=kwargs)
th.daemon = True
th.start()
ret = th.join(seconds)
if th.is_alive():
formated_frame_stack = _get_formated_frame_stack(th.ident)
th.kill()
raise TimeoutError(f"{repr(func)} timeout. Frame stack:\n{formated_frame_stack}")
return ret
return wrapper
return decorator


def retry(retries=1, retry_interval=0):
"""
Decorator to retry a function a specified number of times with a given interval between retries.

Args:
- retries (int): The number of times the function should be retried if it raises an exception. If set to 1, the function will be attempted initially and retried once.
- retry_interval (int): The time interval in seconds to wait between retries.

Returns:
- function: The decorated function.

Raises:
- The original exception raised by the function if all retries fail.

Usage:
@retry(retries=2, retry_interval=2)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(retries+1):
try:
return func(*args, **kwargs)
except Exception as e:
if i < retries:
time.sleep(retry_interval)
else:
raise e
return wrapper
return decorator