1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| import multiprocessing import queue import threading
def call_helper(func, common_data, respective_data): args = [] kw = {} if common_data: assert(isinstance(common_data, list) or isinstance(common_data, tuple) or isinstance(common_data, dict)) if isinstance(common_data, list) or isinstance(common_data, tuple): args += common_data elif isinstance(common_data, dict): kw.update(common_data) if respective_data: assert(isinstance(respective_data, list) or isinstance(respective_data, tuple) or isinstance(respective_data, dict)) if isinstance(respective_data, list) or isinstance(respective_data, tuple): args += respective_data else: kw.update(respective_data) return func(*args, **kw)
def _func(func, common_data, queue_in, queue_out): while True: try: (i, respective_data) = queue_in.get(False) r = call_helper(func, common_data, respective_data) if r is not None: queue_out.put((i, r)) except queue.Empty as e: break
""" 用多进程执行func函数 Args: process_num: 并发的进程数量 func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数 common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递 respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递
Returns: list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致 """ def multi_process(process_num, func, common_data, respective_data_list): manager = multiprocessing.Manager() queue_in = manager.Queue() for i, e in enumerate(respective_data_list): queue_in.put((i,e)) queue_out = manager.Queue() processes = [] for i in range(0, process_num): proc = multiprocessing.Process(target=_func, args=(func, common_data, queue_in, queue_out)) processes.append(proc) for proc in processes: proc.start() for proc in processes: proc.join() if not queue_out.empty(): ret = [None] * len(respective_data_list) while True: try: (i, ret_value) = queue_out.get(False) ret[i] = ret_value except queue.Empty as e: break return ret
""" 用多线程执行func函数 Args: thread_num: 并发的线程数量 func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数 common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递 respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递
Returns: list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致 """ def multi_thread(thread_num, func, common_data, respective_data_list): queue_in = queue.Queue() for i, e in enumerate(respective_data_list): queue_in.put((i,e)) queue_out = queue.Queue() threads = [] for i in range(0, thread_num): thread = threading.Thread(target=_func, args=(func, common_data, queue_in, queue_out)) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join() if not queue_out.empty(): ret = [None] * len(respective_data_list) while True: try: (i, ret_value) = queue_out.get(False) ret[i] = ret_value except queue.Empty as e: break return ret
""" 用单线程执行func函数 Args: _dummy_num: 无用的参数,仅为了和其他同类函数统一调用格式 func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数 common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递 respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递
Returns: list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致 """ def single_thread(_dummy_num, func, common_data, respective_data_list): ret = [] for i, respective_data in enumerate(respective_data_list): r = call_helper(func, common_data, respective_data) if r is not None: if not ret: ret = [None] * len(respective_data_list) ret[i] = r return ret
|