Python实现的一个消息总线+计划任务 Driver

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")