1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| import threading import time import inspect import ctypes import traceback import sys, os from functools import wraps
class KillableThread(threading.Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._return_value = None self._exception = None
def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed")
def kill(self): KillableThread._async_raise(self.ident, SystemExit)
def run(self): try: self._return_value = self._target(*self._args, **self._kwargs) except Exception as e: self._exception = e
def join(self): super().join() if self._exception is not None: raise self._exception return self._return_value
def join(self, timeout): super().join(timeout) if self._exception is not None: raise self._exception return self._return_value
def _get_thread(tid): for t in threading.enumerate(): if t.ident == tid: return t return None
def _get_frame_stack(tid): for thread_id, stack in sys._current_frames().items(): if thread_id == tid: return stack return None
def _get_formated_frame_stack(tid): info = [] th = _get_thread(tid) stack = _get_frame_stack(tid) info.append('%s thread_id=%d' % (th.name, tid)) for filename, lineno, _, line in traceback.extract_stack(stack): info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno)) return '\n'.join(info)
def timeout(seconds): """ Decorator to execute a function with a specified timeout.
Args: - seconds (int): The time limit in seconds for the function to complete.
Returns: - function: The decorated function.
Raises: - TimeoutError: If the function does not complete within the specified time limit.
Usage: @timeout(seconds=10) def my_function(): # Function body """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): th = KillableThread(target=func, args=args, kwargs=kwargs) th.daemon = True th.start() ret = th.join(seconds) if th.is_alive(): formated_frame_stack = _get_formated_frame_stack(th.ident) th.kill() raise TimeoutError(f"{repr(func)} timeout. Frame stack:\n{formated_frame_stack}") return ret return wrapper return decorator
def retry(retries=1, retry_interval=0): """ Decorator to retry a function a specified number of times with a given interval between retries.
Args: - retries (int): The number of times the function should be retried if it raises an exception. If set to 1, the function will be attempted initially and retried once. - retry_interval (int): The time interval in seconds to wait between retries.
Returns: - function: The decorated function.
Raises: - The original exception raised by the function if all retries fail.
Usage: @retry(retries=2, retry_interval=2) def my_function(): # Function body """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for i in range(retries+1): try: return func(*args, **kwargs) except Exception as e: if i < retries: time.sleep(retry_interval) else: raise e return wrapper return decorator
|