1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| import threading import time import queue from dataclasses import dataclass, field from typing import Any import signal import traceback import os
@dataclass(order=True) class ScheduledItem: time_sec: float cb: Any = field(compare=False)
class Driver: def __init__(self): self._scheduled = queue.PriorityQueue() self._scheduled_every_lock = threading.Lock() self._scheduled_every = [] self._callbacks_lock = threading.Lock() self._callbacks = {} self._async_queue = queue.Queue() self._epoch_sec = time.time() self._debug_epoch = None self._last_epoch = None self._sigterm_received = False
def get_epoch(self): return self._epoch_sec
def set_debug_epoch(self, epoch): self._debug_epoch = epoch
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)): if intercept_signals: self.intercept_sigterm(intercept_signals) while not self.sigterm_received(): self.run_step(wait_sync_interval)
def run_step(self, wait_sync_interval=0): if self._debug_epoch: self._epoch_sec = self._debug_epoch else: self._epoch_sec = time.time() if self._last_epoch is not None: if self._epoch_sec - self._last_epoch < wait_sync_interval: t = wait_sync_interval - (self._epoch_sec - self._last_epoch) time.sleep(t) self._epoch_sec = time.time() self._last_epoch = self._epoch_sec
self._do_async() self._do_schedule() self._do_schedule_every() def schedule(self, cb, time_sec): self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )
def schedule_every(self, cb, interval_sec): self._scheduled_every_lock.acquire() self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } ) self._scheduled_every_lock.release()
def add_receiver(self, topic_or_type, cb): self._callbacks_lock.acquire() if topic_or_type not in self._callbacks: self._callbacks[topic_or_type] = set() self._callbacks[topic_or_type].add(cb) self._callbacks_lock.release() return cb
def remove_receiver(self, topic_or_type, cb): self._callbacks_lock.acquire() if topic_or_type in self._callbacks: if cb in self._callbacks[topic_or_type]: self._callbacks[topic_or_type].remove(cb) self._callbacks_lock.release()
def send(self, obj, topic=None): if topic == None: topic = type(obj) cbs = [] self._callbacks_lock.acquire() if topic in self._callbacks.keys(): cbs = list(self._callbacks[topic]) self._callbacks_lock.release() for cb in cbs: cb(obj)
def send_async(self, obj, topic=None): self._async_queue.put_nowait( (obj, topic) )
def _do_async(self): while not self._async_queue.empty(): self.send(*self._async_queue.get_nowait())
def _do_schedule(self): i = 0 while not self._scheduled.empty(): item = self._scheduled.get_nowait() if item.time_sec > self._epoch_sec: self._scheduled.put_nowait(item) break item.cb(self._epoch_sec)
def _do_schedule_every(self): cbs = [] self._scheduled_every_lock.acquire() for o in self._scheduled_every: while self._epoch_sec >= o["next_sec"]: cbs.append(o["cb"]) o["next_sec"] += o["interval"] self._scheduled_every_lock.release() for cb in cbs: cb(self._epoch_sec)
def intercept_sigterm(self, intercept_signals): for sig in intercept_signals: signal.signal(sig, bind(Driver._on_signal, self))
def sigterm_received(self): return self._sigterm_received
def _on_signal(self, signum, stack): print(f"Signal #{signum} received, Traceback:") for filename, lineno, _, line in traceback.extract_stack(stack): print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno)) self._sigterm_received = True
def bind(func, *args, **kw): return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)
driver = Driver()
if __name__ == '__main__': print("initialize")
driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3) driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1) driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10) driver.run(0.1)
print("finalize")
|