0%

监视主线程卡死,卡死时保错再主动崩溃退出,同时打印当时的调用堆栈。
还可以再外部结合监视崩溃自动重启就可以在卡死时实现自动重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import traceback
import time
import sys
import os
from functools import wraps

class WatchDog(threading.Thread):

def __init__(self, timeout=10, echo=False):
super(WatchDog, self).__init__()
self.timeout = timeout
self.echo = echo
self.last_kicked_ts = time.time()
self.lock = threading.Lock()
self.thread_id = threading.currentThread().ident
self.terminated = False
self.setDaemon(True)
self.start()

def terminate(self):
self.terminated = True
self.join(self.timeout)

def kick(self):
self.lock.acquire()
self.last_kicked_ts = time.time()
self.lock.release()

def bark(self):
formated_frame_stack = self._get_formated_frame_stack()
if self.echo:
print("!!!!! WATCH DOG FAILURE TRIGGERED !!!!!\n" + formated_frame_stack, flush=True)
pid = os.getpid()
os.kill(pid, 2) # 通知进程退出
time.sleep(5) # 等待5秒
os.kill(pid, 9) # 发送强制退出

def run(self):
while not self.terminated:
ts = time.time()
self.lock.acquire()
is_timeout = ts - self.last_kicked_ts > self.timeout
self.lock.release()
if is_timeout:
self.bark()
n = int(max(self.timeout / 3, 1))
for i in range(n*10):
time.sleep(0.1)
if self.terminated:
break

@staticmethod
def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None

@staticmethod
def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None

def _get_formated_frame_stack(self):
info = []
th = self._get_thread(self.thread_id)
stack = self._get_frame_stack(self.thread_id)
info.append('%s thead_id=%d' % (th.name, self.thread_id))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def watch_dog(timeout=10, echo=False):
def inner(func):
def wrapper(*args, **kw):
dog = WatchDog(timeout=timeout, echo=echo)
ret = func(*args, **kw)
dog.terminate()
return ret
return wrapper
return inner

用例1,监控函数超时

1
2
3
4
5
6
7
8
9
10
@watch_dog(timeout=3, echo=True)
def func():
# do something
time.sleep(5)

def main():
func()

if __name__ == '__main__':
main()

我们监视func,设置其3秒超时,然后在里面sleep 5秒来引起超时,可以看到输出如下

1
2
3
4
5
6
7
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=2164
at main()(watch_dog.py:97)
at func()(watch_dog.py:94)
at ret = func(*args, **kw)(watch_dog.py:81)
at time.sleep(5)(watch_dog.py:91)
[Finished in 3.5s with exit code 2]

在程序启动3.5秒后就主动崩溃了

用例2,监控循环超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main():
watch_dog = WatchDog(timeout=3, echo=True)

# main loop
# while True:
for i in range(0, 10):
time.sleep(i)
print(f"i={i}")
watch_dog.kick()
watch_dog.terminated()


if __name__ == '__main__':
main()

输出如下

1
2
3
4
5
6
7
8
9
i=0
i=1
i=2
i=3
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=9060
at main()(watch_dog.py:101)
at time.sleep(i)(watch_dog.py:94)
[Finished in 10.1s with exit code 2]

用例3,超时自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing

def main():
...

if __name__ == '__main__':
RETRY_TIMES = 5
for i in range(RETRY_TIMES): # 重试次数
child_process = multiprocessing.Process(target=main)
child_process.start()
child_process.join()

if child_process.exitcode == 0:
print("子进程正常结束")
exit(0)
elif i < RETRY_TIMES - 1:
print("子进程异常结束,即将重试")
else:
print("子进程异常结束,重试超次")
exit(child_process.exitcode)

通过

1
tzselect

修改时区

根据屏幕提示选择选到 Asia/Shanghai 我这里是按4-9-1

然后把

1
TZ='Asia/Shanghai'; export TZ

加入到 .profile 中保证重启后有效

修改 /etc/localtime 链接到的文件

1
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

修改 /etc/timezone 的文件内容

1
echo Asia/Shanghai > /etc/timezone

如果在第一步tzselect就报错,需要先安装tzdata,也可以用下面的命令一步到位

1
2
3
4
5
6
7
export TZ=Asia/Shanghai \
DEBIAN_FRONTEND=noninteractive
apt update -yqq \
&& apt install -yqq tzdata \
&& ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo ${TZ} > /etc/timezone \
&& dpkg-reconfigure --frontend noninteractive tzdata

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")

SQL Server(version 2005)读取数据库中的表信息

1
2
3
4
5
6
select
name as 'table_name',
create_date AS 'create_time',
modify_date AS 'update_time'
from sys.tables
where type = 'U'

SQL Server(version 2005)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT
a.name as 'column_name',
b.name as 'data_type',
COLUMNPROPERTY(a.id,a.name,'PRECISION') as 'data_length',
COLUMNPROPERTY(a.id,a.name,'Scale') as 'data_scale',
case when exists(SELECT 1 FROM sysobjects where xtype='PK' and parent_obj=a.id and name in (
SELECT name FROM sysindexes WHERE indid in( SELECT indid FROM sysindexkeys WHERE id = a.id AND colid=a.colid))) then 1 else 0 end as 'primary_key',
COLUMNPROPERTY(a.id,a.name,'IsIdentity') as 'autoincrement',
a.isnullable as 'nullable',
e.text as 'column_default',
g.value as 'column_comment'
FROM syscolumns a
left join systypes b on a.xusertype=b.xusertype
left join sysobjects d on a.id=d.id and d.xtype='U'
left join syscomments e on a.cdefault=e.id
left join sys.extended_properties g on a.id=g.major_id and a.colid=g.minor_id
where d.name='Role'
order by a.id, a.colorder

查询结果示例如下:

column_name data_type data_length data_scale primary_key autoincrement nullable column_default column_comment
id int 10 0 1 1 0
name nvarchar 50 0 0 0 名称
description nvarchar 300 0 0 1 描述

MySQL(version 5.7)读取数据库中的表信息

1
2
3
4
5
6
7
8
SELECT
TABLE_NAME AS 'table_name',
TABLE_ROWS AS 'table_rows',
CREATE_TIME AS 'create_time',
UPDATE_TIME AS 'update_time'
FROM
information_schema.TABLES
WHERE TABLE_SCHEMA ='test'

其中TABLE_SCHEMA是数据库名。
查询结果示例如下:

table_name table_rows create_time update_time
goods 5 2020/1/15 17:10 2020/1/15 17:10

其中update_time是表结构的更新时间,而不是表数据的更新时间,而且我测试下来create_time也会跟着变化不知为何。

MySQL(version 5.7)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT
COLUMN_NAME as 'column_name',
DATA_TYPE as 'data_type',
IFNULL(CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION) as 'data_length',
NUMERIC_SCALE as 'data_scale',
COLUMN_TYPE as 'column_type',
IF(COLUMN_KEY='PRI', 1, 0) as 'primary_key',
IF(EXTRA='auto_increment', 1, 0) as 'autoincrement',
IF(IS_NULLABLE='YES', 1, 0) as 'nullable',
COLUMN_DEFAULT as 'column_default',
COLUMN_COMMENT as 'column_comment'
FROM
information_schema.COLUMNS
WHERE TABLE_SCHEMA ='test' and TABLE_NAME = 'goods'

其中TABLE_SCHEMA是数据库名,TABLE_NAME是表名。
查询结果示例如下:

column_name data_type data_length data_scale column_type primary_key autoincrement nullable column_default column_comment
id int 10 0 int(11) 1 1 0
name varchar 100 varchar(100) 0 0 1 名称
price decimal 18 2 decimal(18,2) 0 0 0 0 价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# -*- coding: utf-8 -*-
from redis import Redis
import socket
import os
import threading
import time
import uuid
from contextlib import ExitStack, contextmanager

# 加锁时间片
# 对于超过一个加锁时间片的锁应该重复使用expire设置过期时间
# 为的是在程序异常结束时锁能被尽快自动释放,所以时间片不应太长,
# 也不应设置过短,不然一是会频繁的访问redis续过期时长导致资源浪费,
# 二是在很卡的时候负责续时的线程可能无法正常续时。
# 建议设置在5~20秒
LOCK_TIME_SLICE = 10


class KeepLockThread(threading.Thread):

def __init__(self, redis, key_name):
super().__init__()
self.redis = redis
self.key_name = key_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
self.redis.expire(self.key_name, LOCK_TIME_SLICE)
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"
self.redis = Redis(host=server)
self.thread = None

# blocking: 是否阻塞
# shortlived: 是否短暂的加锁,如果是短暂的则不会创建额外的线程维护过期时间,所谓短暂就是确定加锁时间小于LOCK_TIME_SLICE
# 加锁时间是从获得锁到释放锁的时间,不包括阻塞等待的时间,可以阻塞等待很长时间仍然是短暂的加锁。
def acquire(self, blocking=True, shortlived=False):
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
time.sleep(0.05)
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
# 如果加锁成功了,那么应该创建线程维护过期时间
self.thread = KeepLockThread(self.redis, self.key_name)
self.thread.daemon = True
self.thread.start()
return r

def release(self):
if self.acquired():
if self.thread:
self.thread.stop()
self.thread = None
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
if r is not None and r.decode() == str(self.id):
return True
else:
return False

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value is not None:
raise exc_value


def mutex(names, server="127.0.0.1"):
def medium(func):
def wrapper(*args, **kw):
if type(names) == list:
with ExitStack() as stack:
for name in names:
stack.enter_context(Mutex(name))
return func(*args, **kw)
else:
with Mutex(names, server):
return func(*args, **kw)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
return medium


class KeepReadLockThread(threading.Thread):

def __init__(self, redis, lock_id, rlock_name):
super().__init__()
self.redis = redis
self.id = lock_id
self.rlock_name = rlock_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
pipeline = self.redis.pipeline()
now = time.time()
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 读写锁
class ReadWriteLock:

def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.meta_lock_name = "META_" + name

hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"

self.redis = Redis(host=server)
self.lock_type = None
self.thread = None

def acquire_read_lock(self, blocking=True, shortlived=False):
# 这个mutex用来维护读写锁内部数据本身使用,在离开ReadWriteLock函数前必须释放
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-LOCK_TIME_SLICE)*1000) )
# 添加新的读锁
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
self.lock_type = 'R'

if not shortlived:
# 创建线程维护读锁过期时间
self.thread = KeepReadLockThread(self.redis, self.id, self.rlock_name)
self.thread.daemon = True
self.thread.start()

return True
finally:
mutex.release()

def acquire_write_lock(self, blocking=True, shortlived=False):
mutex = Mutex(self.meta_lock_name, self.server)
try:
# 注意这里使用mutex时shortlived始终为True,它不是由外部使用读写锁是否短暂决定
# 而是加读写锁时内部需要用到mutex进行短暂加锁,以保证获取到读锁为0后到加写锁前读锁不会新增
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
else:
return False
# 尝试获取写锁
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
# 没有获取到写锁,那么可能读锁又会被其他地方获取,所以要重新先等到读锁为0
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
self.thread = KeepLockThread(self.redis, self.wlock_name)
self.thread.daemon = True
self.thread.start()

self.lock_type = 'W'
return r
finally:
mutex.release()

def acquired(self, lock_type = "any"):
if lock_type == "any":
if self.lock_type is not None:
return True
else:
return False
else:
if self.lock_type == lock_type:
return True
else:
return False

def release(self):
if self.thread:
self.thread.stop()
self.thread = None
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r is not None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None


@contextmanager
def read_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_read_lock()
yield rwlock
finally:
rwlock.release()


@contextmanager
def write_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_write_lock()
yield rwlock
finally:
rwlock.release()

Mutex支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是利用redis设置过期时间,超时后自动删除,一次长时间的加锁会通过一个线程不断的续时,如果加锁后崩溃,锁会在一个时间片内的时间被自动释放。

又基于Mutex实现了一个ReadWriteLock,也支持阻塞和非阻塞,以及崩溃后自动释放。

封装基类

模型基类,管理engine,事务处理,JSON序列化 的代码

通用代码 db_.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# -*- coding: utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from urllib import parse
import re
import datetime
import types
import decimal


class ModelBase(object):

@declared_attr
def __tablename__(cls):
# return cls.__name__.lower()
return re.sub(r'([A-Z])', r'_\1', cls.__name__[0].lower()+cls.__name__[1:]).lower()

@classmethod
def props(cls):
if cls.__base__.__name__ == "Base":
return [c for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c for c in cls.__base__.__table__.columns]
column_names = [c for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的props获取暂未实现")

@classmethod
def prop_names(cls):
if cls.__base__.__name__ == "Base":
return [c.name for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c.name for c in cls.__base__.__table__.columns]
column_names = [c.name for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的prop_names获取暂未实现")

@classmethod
def prop(cls, prop_name):
return cls.__table__.columns[prop_name]

# 对象在输出时如何序列化成字符串
def __repr__(self):
attrs = []
# for c in self.__table__.columns:
for name in self.prop_names():
attr = getattr(self, name)
if type(attr) in (str, datetime.date, datetime.time, datetime.datetime):
attrs.append(f"{name}='{attr}'")
else:
attrs.append(f"{name}={attr}")
return f"{self.__class__.__name__}({', '.join(attrs)})"

# 转换成dict时的键列表
def keys(self):
return self.prop_names()

def __getitem__(self, item):
return getattr(self, item)

def __setitem__(self, item, value):
return setattr(self, item, value)

def to_dict(self):
return {name: to_jsonable(getattr(self, name)) for name in self.keys()}


def to_dict(db_obj):
if isinstance(db_obj, ModelBase):
return db_obj.to_dict()
else:
return db_obj

def to_list(db_objs):
return [to_dict(db_obj) for db_obj in db_objs]


# Base = declarative_base(cls=ModelBase)


# 把ORM对象转成可序列化成JSON的对象,对于ORM对象的list转换为dict的list,对于ORM对象转换成dict
def to_jsonable(o):
if isinstance(o, list):
return [to_jsonable(e) for e in o]
if isinstance(o, dict):
return {k: to_jsonable(v) for (k,v) in o.items()}
if isinstance(o, ModelBase):
return o.to_dict()
if isinstance(o, datetime.datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, datetime.date):
return o.strftime('%Y-%m-%d')
if isinstance(o, datetime.time):
return o.strftime('%H:%M:%S')
if isinstance(o, decimal.Decimal):
return float(o)
return o


engines = {}

r"""
不同数据库URL样例
r"sqlite:///:memory:" sqlite内存数据库
r"sqlite:///C:\path\foo.db" sqlite在windows上
r"sqlite:////root/data/foo.db" sqlite在linux上
r"mysql+pymysql://username:password@server/dbname" MySQL
r"postgresql+psycopg2://username:password@server/dbname" Postgresql
r"mssql+pymssql://username:password@server/dbname?charset=utf8" Microsoft SQL Server
"""
def init_engine(
url=None, name="main",
dialect=None, username="", password="", server="", dbname=None,
**kwargs):
if dialect:
if dialect.lower() in ("mysql", ):
dialect = "mysql+pymysql"
elif dialect.lower() in ("postgresql", "postgres", "pgsql"):
dialect = "postgresql+psycopg2"
elif dialect.lower() in ("mssql", "sqlserver"):
dialect = "mssql+pymssql"
if url is None:
if password:
url = '{}://{}:{}@{}/{}'.format(dialect, username, parse.quote_plus(password), server, dbname)
elif username:
url = '{}://{}@{}/{}'.format(dialect, username, server, dbname)
else:
url = '{}://{}/{}'.format(dialect, server, dbname)

# # 设置连接池默认大小
# if "pool_size" not in kwargs:
# kwargs["pool_size"] = 5

# # 设置连接默认回收时间(MySQL默认8小时)
# if "pool_recycle" not in kwargs:
# kwargs["pool_recycle"] = 28000

# 默认启用pool_pre_ping
if "pool_pre_ping" not in kwargs:
kwargs["pool_pre_ping"] = True

engine = create_engine(url, **kwargs)
session_maker = sessionmaker(expire_on_commit=False)
session_maker.configure(bind=engine)

@contextmanager
def _session_scope(maker):
session = maker()
mutexes = []
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
engine.session_scope = types.MethodType(_session_scope, session_maker)
engines[name] = engine
return engine


def exit_engine(name="main"):
if name in engines:
del engines[name]


def get_engine(name="main"):
if name in engines:
return engines[name]
else:
return None


def session_scope(name="main"):
if name in engines:
return engines[name].session_scope()
else:
raise Exception("engine未初始化")

数据库模型(表)定义

db_test.py 基于db_.py来定义数据库和使用数据库,包括如何定义关系映射,如何定义外键,唯一键,索引,如何将查询结果转换成可JSON序列化的对象(以便通过RESTful API返回)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
from sqlalchemy import Table, Column, Integer, Float, Numeric, String, DateTime, Date, Time, Boolean
from sqlalchemy import PrimaryKeyConstraint, ForeignKey, Index, UniqueConstraint
from sqlalchemy import text, func, and_, or_, not_, asc, inspect, desc, distinct
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base

from utils.db_ import ModelBase, init_engine, exit_engine, get_engine, session_scope, to_jsonable

Base = declarative_base(cls=ModelBase)


class Clazz(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
enroll_year = Column(Integer(), comment="入学年份")


class Student(Base):
__table_args__ = (
UniqueConstraint('clazz_id', 'name'), # 创建多字段唯一约束
Index('ix_clazz_id_stu_no', 'clazz_id', 'stu_no'), # 创建多字段索引
)
id = Column(Integer(), primary_key=True, autoincrement=True) # 自增ID作为单字段主键,多字段主键也可以在多字段上都写上primary_key=True
stu_no = Column(String(), index=True, unique=True, comment="学号") # 创建单字段索引,创建单字段唯一约束
name = Column(String(), nullable=True, comment="姓名") # 创建非空约束
clazz_id = Column(Integer(), ForeignKey('clazz.id', ondelete='RESTRICT'), nullable=True, comment="班级ID") # 创建外键

clazz = relationship('Clazz') # 多对一(一对多)关系映射


class Course(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
name = Column(String(), comment="课程名")

students = relationship('Student', secondary='student_course', backref=backref('courses')) # 多对多映射关系,backref相当于给Student对象加了courses属性

def keys(self):
return super().keys() + ["students",]


# 选课(多对多关系表)
student_course = Table('student_course', Base.metadata,
Column('student_id', Integer(), ForeignKey('student.id'), primary_key=True),
Column('course_id', Integer(), ForeignKey('course.id'), primary_key=True))


def main():
engine = init_engine(url=r"sqlite:///:memory:")
Base.metadata.create_all(engine) # 根据定义创建出所有表
with session_scope() as session:
clazz = Clazz(enroll_year=2022) # 创建一个clazz对象
session.add(clazz) # 用clazz对象创建一行记录
session.flush() # 立即执行
session.refresh(clazz) # 更新clazz,把刚刚创建的一行记录的id取出来更新到对象中
print(clazz)

course = Course(name="数学")
session.add(course)
session.flush()
course = session.query(Course).filter(Course.name=="数学").first() # 把刚插入的记录查询出来
print(course)

student = Student(stu_no="007", name="张三", clazz_id=clazz.id)
student.courses.append(course) # 测试backref给student加了courses属性,并且可以通过其维护关系表
session.add(course)
session.flush()

session.refresh(course)
print(course.students)
print(to_jsonable(course)) #to_jsonable有students属性需要覆盖keys方法来支持

student.name = "李四"
session.flush() # 这里相当于执行了update


# 执行原生SQL查询
r = session.execute("select id, name from student").fetchall()
for e in r:
print(e) # 可以看到已经变成了李四



if __name__ == '__main__':
main()

使用sqlacodegen对已有数据库生成模型定义代码

通过pip安装

1
pip install sqlacodegen

命令格式 就是 sqlacodegen 后面跟初始化engine的url

1
sqlacodegen <sqldialect>[+<sqldriver>]://<username>:<password>@<server>/<dbname>

执行命令会在控制台输出生成的代码,可以重定向到.py

示例

1
sqlacodegen mysql+pymysql://mzdai:123456@192.168.1.140/mzdai > db_mzdai.py

然后可以修改生成的代码,例如导入db_中的一些内容,然后替换模型的基类 declarative_base(cls=ModelBase)

临时修改,重启失效

1
hostname xxx

永久修改,重启生效

1
vim /etc/hostname

域名解析文件一并修改

1
vim /etc/hosts

默认的源在国外,下载软件速度较慢,可以替换为国内的源,比如阿里的,替换 /etc/apt/sources.list文件为

1
2
3
4
5
6
7
8
9
10
11
12
# 阿里云镜像
deb http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
# 预发布版软件,可以不启用
deb http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse

目前3个主流版本的代号分别是

18.04 LTS bionic

20.04 LTS focal

22.04 LTS jammy

其他版本则把其中的bionic替换成其他版本的代号。

可以用tee命令直接修改文件,而不用vim或nano打开,以20.04 LTS,不带预发布版软件为例

1
2
3
4
5
6
7
8
9
10
tee /etc/apt/sources.list <<-'EOF'
deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
EOF

更新软件包

1
apt-get update

如果提示 NO_PUBKEY 3B4FE6ACC0B21F32

则执行

1
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32

Python的线程没有提供从外部主动终止线程的方法,当然从程序设计上应该避免这种方式,但有些时候我们希望把线程超时当做一种异常来处理,我们需要提前终止它,下面的代码提供了一种可参考方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import inspect
import ctypes

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

def kill_thread(tid):
_async_raise(tid, SystemExit)

实际测试下来大多数情况是可以工作的,除了线程长时间陷在time.sleep的情况,也就是说上面的代码无法把一个线程从sleep状态唤醒并终止。

无监督学习算法指的是算法只有输入数据(Features),不需要用到(或根本不知道)输入的对应输出(Target),从中提取一些有用的知识。

sklearn中无监督学习算法主要分为以下类型:预处理、分解和聚类。

scikit-learn中的算法实现

算法中文名 所属模块 类名 主要参数
范围缩放 preprocessing MinMaxScaler feature_range=(0,1)
标准化缩放 preprocessing StandardScaler
One-Hot编码 preprocessing OneHotEncoder categorical_features = ‘all’
多项式特征 preprocessing PolynomialFeatures degree=2
主成分分析 decomposition PCA n_components, whiten=False
非负矩阵分解 decomposition NMF n_components
K聚类 cluster KMeans n_clusters
凝聚聚类 cluster AgglomerativeClustering n_clusters, linkage=ward
DBSCAN cluster DBSCAN min_samples, eps

可以看到上述算法所属模块即对应了预处理、分解和聚类。

预处理就是用于对监督学习算法的输入数据做前期处理,输入和输出都是一组Features。

事实上分解和聚类也可以作为监督学习算法的前期处理,不过他们也可以提供一些额外的功能。

下面对这些算法做逐一简要说明:

MinMaxScaler

根据最大最小值缩放和平移特征,默认参数使得每个特征都在0~1之间。

StandardScaler

根据均值和方差缩放和平移特征,默认参数使得每个特征的均值是0,方差是1。

OneHotEncoder

对分类变量(特征是离散的枚举值)进行编码,把一个具有N个枚举值的特征用N个0,1值的特征表示。

PolynomialFeatures

制造原特征的交互特征和多项式特征,例如(x1,x2)可以生成出(1, x1, x2, x1^2, x2^2, x1*x2),通过degree可以控制生成特征的最高次。可以让线性模型学习出对原特征来说非线性的结果。

PCA

主成分分析是找到原特征的一种新的表示,从线性代数的角度讲是找到一组正交基,然后把原特征当成向量,算出它们在新基下的表示,也就得到一组新表示下特征。这组正交基的取法是先找到原特征离散度最大的轴向,作为正交基的第一个轴,然后第二个轴是在与第一个轴“垂直”的超平面上,继续选取离散度最大的轴向,第三个轴要在与前两个轴都“垂直”的超平面上找离散度最大的轴向,以此类推。因为变换得到的新特征再前几个轴上离散程度较高后面依次降低,我们有理由期望新特征的前几个分量对Target的影响更大(虽然不一定),因此我们可以用PCA来对Feature降维(即丢弃掉后面离散程度最小的一些轴向上的坐标)。

NMF

非负矩阵分解原理类似PCA,不过它的基并不正交,NMF要求原特征均为非负,其基的每个分量和得到的新特征也均为非负,对于有多个独立源叠加而成的数据,比如多人说话的音轨或包含多种乐器的音乐,NMF能识别出组成合成数据的原始分量。NMF也可以用于降维。

KMeans

K均值聚类,所谓聚类就是把数据集按照一定的规则进行分组,使得同组的数据相似,不同组的数据相异,这些组在聚类算法中称为簇,K均值聚类要事先告知簇个数,算法的核心就是不停修正每个簇的簇中心,先随机选K个点作为簇中心,交替进行以下两个步骤:将每个数据点分配给最近的簇中心,然后将每个簇中心设置为所分配的所有数据点的平均值,重复以上步骤直至簇的分配不再变化。前面说聚类也可以作为监督学习算法的前期处理,这里如果我们用簇中心来代表簇里的数据点,那么每个点都可以用一个单一分量来表示(簇的编号),这称为矢量量化。(不过这种分量一般不具有连续意义,似乎还要再进行OneHot编码)

AgglomerativeClustering

凝聚聚类,先把每个数据点视作一个簇,然后按照一定的规则逐步合并,凝聚聚类算法可以生成可视的树状图来观察合并过程。凝聚聚类也需要提前告知簇个数。

DBSCAN

DBSCAN聚类,先随机选取一个没标记过的数据点标记一个新簇,以eps为距离做DFS让遍历到的数据点加入簇,如果最终遍历到的点少于min_samples则视为噪声,然后在再随机选取数据点再遍历再标记,进行直到所有点被标记到一个簇里或标记为噪声,DBSCAN可以生成具有复杂形状的簇,噪声也可以用来做异常值检测