0%

Alembic需要SQLAlchemy支持,如果项目是使用Python基于SQLAlchemy的开发的,那么可以用Alembic管理数据库版本变化。

Alembic将版本间的一组变化称为一个迁移,将变化过程称为迁移。我们可以说从一个版本迁移到另一个版本。

我们先新建一个目录alembic

初始化Alembic

1
alembic init <YOUR_ALEMBIC_DIR>

例如

alembic init alembic_foo,foo可以是项目的数据库名

有两个文件需要修改

alembic.ini alembic_foo/env.py

alembic.ini 中修改sqlalchemy.url

例如 sqlalchemy.url = postgresql+psycopg2://用户名:密码@ip_address/dbname

env.py 修改 # target_metadata = mymodel.Base.metadata

修改把项目中用来定义数据模型基类的Base import进来,改成 target_metadata = Base.metadata,以便Alembic知道项目中有哪些表,表结构是什么

自动生成迁移(生成初始迁移)

1
alembic revision --autogenerate -m <log_message>

我都是用日期和时间作为log_message,例如

1
alembic revision --autogenerate -m "%date:~0,4%%date:~5,2%%date:~8,2%%time:~0,2%%time:~3,2%%time:~6,2%"

自动生成迁移的时候,Alembic会对比数据库现有的表结构和配置的Base.metadata对应的表结构,生成差异转换代码,我们首次生成的时候一般两边是一致的,所以生成的迁移脚本其实是什么都不做。

执行上面的命令可以看到会生成 alembic_foo/versions/xxxx.py ,其中xxx就是revision+log_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""20200326 91627

Revision ID: bab1fb444e93
Revises:
Create Date: 2020-03-26 09:16:31.264071

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bab1fb444e93'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

把数据库升级到最新版本

1
alembic upgrade head

其实会做一遍upgrade中的命令,现在为空就什么都不做。

其实alembic做了一件事,在数据库中创建了一张名为alembic_version的表,里面只有一行一列记录着当前的数据库版本

以后要修改数据库模型时的步骤如下:

(1)修改SQLAlchemy Base那边的代码,即通过ORM框架定义的表示数据表结构的Python类。

(2)执行 alembic revision --autogenerate ... 命令生成迁移脚本。

(3)检查自动生成的脚本,改成不准确的地方。(例如重命名会变成删除再添加,会丢失数据)

(4)执行 alembic upgrade head 把改动应用到数据库

降级或升级数据库

升级到最新前面已经说过了

1
alembic upgrade head

要指定版本的话,看到前面自动生成的py文件里面有个Revision ID,同时也是py文件的前缀。

升级到指定版本

1
alembic upgrade <Revision ID>

降级到指定版本

1
alembic downgrade <Revision ID>

参考:

https://alembic.sqlalchemy.org/en/latest/tutorial.html

https://www.cnblogs.com/blackmatrix/p/6236573.html

监视主线程卡死,卡死时保错再主动崩溃退出,同时打印当时的调用堆栈。
还可以再外部结合监视崩溃自动重启就可以在卡死时实现自动重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import traceback
import time
import sys
import os
from functools import wraps

class WatchDog(threading.Thread):

def __init__(self, timeout=10, echo=False):
super(WatchDog, self).__init__()
self.timeout = timeout
self.echo = echo
self.last_kicked_ts = time.time()
self.lock = threading.Lock()
self.thread_id = threading.currentThread().ident
self.terminated = False
self.setDaemon(True)
self.start()

def terminate(self):
self.terminated = True
self.join(self.timeout)

def kick(self):
self.lock.acquire()
self.last_kicked_ts = time.time()
self.lock.release()

def bark(self):
formated_frame_stack = self._get_formated_frame_stack()
if self.echo:
print("!!!!! WATCH DOG FAILURE TRIGGERED !!!!!\n" + formated_frame_stack, flush=True)
pid = os.getpid()
os.kill(pid, 2) # 通知进程退出
time.sleep(5) # 等待5秒
os.kill(pid, 9) # 发送强制退出

def run(self):
while not self.terminated:
ts = time.time()
self.lock.acquire()
is_timeout = ts - self.last_kicked_ts > self.timeout
self.lock.release()
if is_timeout:
self.bark()
n = int(max(self.timeout / 3, 1))
for i in range(n*10):
time.sleep(0.1)
if self.terminated:
break

@staticmethod
def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None

@staticmethod
def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None

def _get_formated_frame_stack(self):
info = []
th = self._get_thread(self.thread_id)
stack = self._get_frame_stack(self.thread_id)
info.append('%s thead_id=%d' % (th.name, self.thread_id))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def watch_dog(timeout=10, echo=False):
def inner(func):
def wrapper(*args, **kw):
dog = WatchDog(timeout=timeout, echo=echo)
ret = func(*args, **kw)
dog.terminate()
return ret
return wrapper
return inner

用例1,监控函数超时

1
2
3
4
5
6
7
8
9
10
@watch_dog(timeout=3, echo=True)
def func():
# do something
time.sleep(5)

def main():
func()

if __name__ == '__main__':
main()

我们监视func,设置其3秒超时,然后在里面sleep 5秒来引起超时,可以看到输出如下

1
2
3
4
5
6
7
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=2164
at main()(watch_dog.py:97)
at func()(watch_dog.py:94)
at ret = func(*args, **kw)(watch_dog.py:81)
at time.sleep(5)(watch_dog.py:91)
[Finished in 3.5s with exit code 2]

在程序启动3.5秒后就主动崩溃了

用例2,监控循环超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main():
watch_dog = WatchDog(timeout=3, echo=True)

# main loop
# while True:
for i in range(0, 10):
time.sleep(i)
print(f"i={i}")
watch_dog.kick()
watch_dog.terminated()


if __name__ == '__main__':
main()

输出如下

1
2
3
4
5
6
7
8
9
i=0
i=1
i=2
i=3
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=9060
at main()(watch_dog.py:101)
at time.sleep(i)(watch_dog.py:94)
[Finished in 10.1s with exit code 2]

用例3,超时自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing

def main():
...

if __name__ == '__main__':
RETRY_TIMES = 5
for i in range(RETRY_TIMES): # 重试次数
child_process = multiprocessing.Process(target=main)
child_process.start()
child_process.join()

if child_process.exitcode == 0:
print("子进程正常结束")
exit(0)
elif i < RETRY_TIMES - 1:
print("子进程异常结束,即将重试")
else:
print("子进程异常结束,重试超次")
exit(child_process.exitcode)

通过

1
tzselect

修改时区

根据屏幕提示选择选到 Asia/Shanghai 我这里是按4-9-1

然后把

1
TZ='Asia/Shanghai'; export TZ

加入到 .profile 中保证重启后有效

修改 /etc/localtime 链接到的文件

1
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

修改 /etc/timezone 的文件内容

1
echo Asia/Shanghai > /etc/timezone

如果在第一步tzselect就报错,需要先安装tzdata,也可以用下面的命令一步到位

1
2
3
4
5
6
7
export TZ=Asia/Shanghai \
DEBIAN_FRONTEND=noninteractive
apt update -yqq \
&& apt install -yqq tzdata \
&& ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo ${TZ} > /etc/timezone \
&& dpkg-reconfigure --frontend noninteractive tzdata

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")

SQL Server(version 2005)读取数据库中的表信息

1
2
3
4
5
6
select
name as 'table_name',
create_date AS 'create_time',
modify_date AS 'update_time'
from sys.tables
where type = 'U'

SQL Server(version 2005)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT
a.name as 'column_name',
b.name as 'data_type',
COLUMNPROPERTY(a.id,a.name,'PRECISION') as 'data_length',
COLUMNPROPERTY(a.id,a.name,'Scale') as 'data_scale',
case when exists(SELECT 1 FROM sysobjects where xtype='PK' and parent_obj=a.id and name in (
SELECT name FROM sysindexes WHERE indid in( SELECT indid FROM sysindexkeys WHERE id = a.id AND colid=a.colid))) then 1 else 0 end as 'primary_key',
COLUMNPROPERTY(a.id,a.name,'IsIdentity') as 'autoincrement',
a.isnullable as 'nullable',
e.text as 'column_default',
g.value as 'column_comment'
FROM syscolumns a
left join systypes b on a.xusertype=b.xusertype
left join sysobjects d on a.id=d.id and d.xtype='U'
left join syscomments e on a.cdefault=e.id
left join sys.extended_properties g on a.id=g.major_id and a.colid=g.minor_id
where d.name='Role'
order by a.id, a.colorder

查询结果示例如下:

column_name data_type data_length data_scale primary_key autoincrement nullable column_default column_comment
id int 10 0 1 1 0
name nvarchar 50 0 0 0 名称
description nvarchar 300 0 0 1 描述

MySQL(version 5.7)读取数据库中的表信息

1
2
3
4
5
6
7
8
SELECT
TABLE_NAME AS 'table_name',
TABLE_ROWS AS 'table_rows',
CREATE_TIME AS 'create_time',
UPDATE_TIME AS 'update_time'
FROM
information_schema.TABLES
WHERE TABLE_SCHEMA ='test'

其中TABLE_SCHEMA是数据库名。
查询结果示例如下:

table_name table_rows create_time update_time
goods 5 2020/1/15 17:10 2020/1/15 17:10

其中update_time是表结构的更新时间,而不是表数据的更新时间,而且我测试下来create_time也会跟着变化不知为何。

MySQL(version 5.7)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT
COLUMN_NAME as 'column_name',
DATA_TYPE as 'data_type',
IFNULL(CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION) as 'data_length',
NUMERIC_SCALE as 'data_scale',
COLUMN_TYPE as 'column_type',
IF(COLUMN_KEY='PRI', 1, 0) as 'primary_key',
IF(EXTRA='auto_increment', 1, 0) as 'autoincrement',
IF(IS_NULLABLE='YES', 1, 0) as 'nullable',
COLUMN_DEFAULT as 'column_default',
COLUMN_COMMENT as 'column_comment'
FROM
information_schema.COLUMNS
WHERE TABLE_SCHEMA ='test' and TABLE_NAME = 'goods'

其中TABLE_SCHEMA是数据库名,TABLE_NAME是表名。
查询结果示例如下:

column_name data_type data_length data_scale column_type primary_key autoincrement nullable column_default column_comment
id int 10 0 int(11) 1 1 0
name varchar 100 varchar(100) 0 0 1 名称
price decimal 18 2 decimal(18,2) 0 0 0 0 价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# -*- coding: utf-8 -*-
from redis import Redis
import socket
import os
import threading
import time
import uuid
from contextlib import ExitStack, contextmanager

# 加锁时间片
# 对于超过一个加锁时间片的锁应该重复使用expire设置过期时间
# 为的是在程序异常结束时锁能被尽快自动释放,所以时间片不应太长,
# 也不应设置过短,不然一是会频繁的访问redis续过期时长导致资源浪费,
# 二是在很卡的时候负责续时的线程可能无法正常续时。
# 建议设置在5~20秒
LOCK_TIME_SLICE = 10


class KeepLockThread(threading.Thread):

def __init__(self, redis, key_name):
super().__init__()
self.redis = redis
self.key_name = key_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
self.redis.expire(self.key_name, LOCK_TIME_SLICE)
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"
self.redis = Redis(host=server)
self.thread = None

# blocking: 是否阻塞
# shortlived: 是否短暂的加锁,如果是短暂的则不会创建额外的线程维护过期时间,所谓短暂就是确定加锁时间小于LOCK_TIME_SLICE
# 加锁时间是从获得锁到释放锁的时间,不包括阻塞等待的时间,可以阻塞等待很长时间仍然是短暂的加锁。
def acquire(self, blocking=True, shortlived=False):
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
time.sleep(0.05)
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
# 如果加锁成功了,那么应该创建线程维护过期时间
self.thread = KeepLockThread(self.redis, self.key_name)
self.thread.daemon = True
self.thread.start()
return r

def release(self):
if self.acquired():
if self.thread:
self.thread.stop()
self.thread = None
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
if r is not None and r.decode() == str(self.id):
return True
else:
return False

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value is not None:
raise exc_value


def mutex(names, server="127.0.0.1"):
def medium(func):
def wrapper(*args, **kw):
if type(names) == list:
with ExitStack() as stack:
for name in names:
stack.enter_context(Mutex(name))
return func(*args, **kw)
else:
with Mutex(names, server):
return func(*args, **kw)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
return medium


class KeepReadLockThread(threading.Thread):

def __init__(self, redis, lock_id, rlock_name):
super().__init__()
self.redis = redis
self.id = lock_id
self.rlock_name = rlock_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
pipeline = self.redis.pipeline()
now = time.time()
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 读写锁
class ReadWriteLock:

def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.meta_lock_name = "META_" + name

hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"

self.redis = Redis(host=server)
self.lock_type = None
self.thread = None

def acquire_read_lock(self, blocking=True, shortlived=False):
# 这个mutex用来维护读写锁内部数据本身使用,在离开ReadWriteLock函数前必须释放
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-LOCK_TIME_SLICE)*1000) )
# 添加新的读锁
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
self.lock_type = 'R'

if not shortlived:
# 创建线程维护读锁过期时间
self.thread = KeepReadLockThread(self.redis, self.id, self.rlock_name)
self.thread.daemon = True
self.thread.start()

return True
finally:
mutex.release()

def acquire_write_lock(self, blocking=True, shortlived=False):
mutex = Mutex(self.meta_lock_name, self.server)
try:
# 注意这里使用mutex时shortlived始终为True,它不是由外部使用读写锁是否短暂决定
# 而是加读写锁时内部需要用到mutex进行短暂加锁,以保证获取到读锁为0后到加写锁前读锁不会新增
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
else:
return False
# 尝试获取写锁
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
# 没有获取到写锁,那么可能读锁又会被其他地方获取,所以要重新先等到读锁为0
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
self.thread = KeepLockThread(self.redis, self.wlock_name)
self.thread.daemon = True
self.thread.start()

self.lock_type = 'W'
return r
finally:
mutex.release()

def acquired(self, lock_type = "any"):
if lock_type == "any":
if self.lock_type is not None:
return True
else:
return False
else:
if self.lock_type == lock_type:
return True
else:
return False

def release(self):
if self.thread:
self.thread.stop()
self.thread = None
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r is not None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None


@contextmanager
def read_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_read_lock()
yield rwlock
finally:
rwlock.release()


@contextmanager
def write_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_write_lock()
yield rwlock
finally:
rwlock.release()

Mutex支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是利用redis设置过期时间,超时后自动删除,一次长时间的加锁会通过一个线程不断的续时,如果加锁后崩溃,锁会在一个时间片内的时间被自动释放。

又基于Mutex实现了一个ReadWriteLock,也支持阻塞和非阻塞,以及崩溃后自动释放。

封装基类

模型基类,管理engine,事务处理,JSON序列化 的代码

通用代码 db_.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# -*- coding: utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from urllib import parse
import re
import datetime
import types
import decimal


class ModelBase(object):

@declared_attr
def __tablename__(cls):
# return cls.__name__.lower()
return re.sub(r'([A-Z])', r'_\1', cls.__name__[0].lower()+cls.__name__[1:]).lower()

@classmethod
def props(cls):
if cls.__base__.__name__ == "Base":
return [c for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c for c in cls.__base__.__table__.columns]
column_names = [c for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的props获取暂未实现")

@classmethod
def prop_names(cls):
if cls.__base__.__name__ == "Base":
return [c.name for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c.name for c in cls.__base__.__table__.columns]
column_names = [c.name for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的prop_names获取暂未实现")

@classmethod
def prop(cls, prop_name):
return cls.__table__.columns[prop_name]

# 对象在输出时如何序列化成字符串
def __repr__(self):
attrs = []
# for c in self.__table__.columns:
for name in self.prop_names():
attr = getattr(self, name)
if type(attr) in (str, datetime.date, datetime.time, datetime.datetime):
attrs.append(f"{name}='{attr}'")
else:
attrs.append(f"{name}={attr}")
return f"{self.__class__.__name__}({', '.join(attrs)})"

# 转换成dict时的键列表
def keys(self):
return self.prop_names()

def __getitem__(self, item):
return getattr(self, item)

def __setitem__(self, item, value):
return setattr(self, item, value)

def to_dict(self):
return {name: to_jsonable(getattr(self, name)) for name in self.keys()}


def to_dict(db_obj):
if isinstance(db_obj, ModelBase):
return db_obj.to_dict()
else:
return db_obj

def to_list(db_objs):
return [to_dict(db_obj) for db_obj in db_objs]


# Base = declarative_base(cls=ModelBase)


# 把ORM对象转成可序列化成JSON的对象,对于ORM对象的list转换为dict的list,对于ORM对象转换成dict
def to_jsonable(o):
if isinstance(o, list):
return [to_jsonable(e) for e in o]
if isinstance(o, dict):
return {k: to_jsonable(v) for (k,v) in o.items()}
if isinstance(o, ModelBase):
return o.to_dict()
if isinstance(o, datetime.datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, datetime.date):
return o.strftime('%Y-%m-%d')
if isinstance(o, datetime.time):
return o.strftime('%H:%M:%S')
if isinstance(o, decimal.Decimal):
return float(o)
return o


engines = {}

r"""
不同数据库URL样例
r"sqlite:///:memory:" sqlite内存数据库
r"sqlite:///C:\path\foo.db" sqlite在windows上
r"sqlite:////root/data/foo.db" sqlite在linux上
r"mysql+pymysql://username:password@server/dbname" MySQL
r"postgresql+psycopg2://username:password@server/dbname" Postgresql
r"mssql+pymssql://username:password@server/dbname?charset=utf8" Microsoft SQL Server
"""
def init_engine(
url=None, name="main",
dialect=None, username="", password="", server="", dbname=None,
**kwargs):
if dialect:
if dialect.lower() in ("mysql", ):
dialect = "mysql+pymysql"
elif dialect.lower() in ("postgresql", "postgres", "pgsql"):
dialect = "postgresql+psycopg2"
elif dialect.lower() in ("mssql", "sqlserver"):
dialect = "mssql+pymssql"
if url is None:
if password:
url = '{}://{}:{}@{}/{}'.format(dialect, username, parse.quote_plus(password), server, dbname)
elif username:
url = '{}://{}@{}/{}'.format(dialect, username, server, dbname)
else:
url = '{}://{}/{}'.format(dialect, server, dbname)

# # 设置连接池默认大小
# if "pool_size" not in kwargs:
# kwargs["pool_size"] = 5

# # 设置连接默认回收时间(MySQL默认8小时)
# if "pool_recycle" not in kwargs:
# kwargs["pool_recycle"] = 28000

# 默认启用pool_pre_ping
if "pool_pre_ping" not in kwargs:
kwargs["pool_pre_ping"] = True

engine = create_engine(url, **kwargs)
session_maker = sessionmaker(expire_on_commit=False)
session_maker.configure(bind=engine)

@contextmanager
def _session_scope(maker):
session = maker()
mutexes = []
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
engine.session_scope = types.MethodType(_session_scope, session_maker)
engines[name] = engine
return engine


def exit_engine(name="main"):
if name in engines:
del engines[name]


def get_engine(name="main"):
if name in engines:
return engines[name]
else:
return None


def session_scope(name="main"):
if name in engines:
return engines[name].session_scope()
else:
raise Exception("engine未初始化")

数据库模型(表)定义

db_test.py 基于db_.py来定义数据库和使用数据库,包括如何定义关系映射,如何定义外键,唯一键,索引,如何将查询结果转换成可JSON序列化的对象(以便通过RESTful API返回)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
from sqlalchemy import Table, Column, Integer, Float, Numeric, String, DateTime, Date, Time, Boolean
from sqlalchemy import PrimaryKeyConstraint, ForeignKey, Index, UniqueConstraint
from sqlalchemy import text, func, and_, or_, not_, asc, inspect, desc, distinct
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base

from utils.db_ import ModelBase, init_engine, exit_engine, get_engine, session_scope, to_jsonable

Base = declarative_base(cls=ModelBase)


class Clazz(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
enroll_year = Column(Integer(), comment="入学年份")


class Student(Base):
__table_args__ = (
UniqueConstraint('clazz_id', 'name'), # 创建多字段唯一约束
Index('ix_clazz_id_stu_no', 'clazz_id', 'stu_no'), # 创建多字段索引
)
id = Column(Integer(), primary_key=True, autoincrement=True) # 自增ID作为单字段主键,多字段主键也可以在多字段上都写上primary_key=True
stu_no = Column(String(), index=True, unique=True, comment="学号") # 创建单字段索引,创建单字段唯一约束
name = Column(String(), nullable=True, comment="姓名") # 创建非空约束
clazz_id = Column(Integer(), ForeignKey('clazz.id', ondelete='RESTRICT'), nullable=True, comment="班级ID") # 创建外键

clazz = relationship('Clazz') # 多对一(一对多)关系映射


class Course(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
name = Column(String(), comment="课程名")

students = relationship('Student', secondary='student_course', backref=backref('courses')) # 多对多映射关系,backref相当于给Student对象加了courses属性

def keys(self):
return super().keys() + ["students",]


# 选课(多对多关系表)
student_course = Table('student_course', Base.metadata,
Column('student_id', Integer(), ForeignKey('student.id'), primary_key=True),
Column('course_id', Integer(), ForeignKey('course.id'), primary_key=True))


def main():
engine = init_engine(url=r"sqlite:///:memory:")
Base.metadata.create_all(engine) # 根据定义创建出所有表
with session_scope() as session:
clazz = Clazz(enroll_year=2022) # 创建一个clazz对象
session.add(clazz) # 用clazz对象创建一行记录
session.flush() # 立即执行
session.refresh(clazz) # 更新clazz,把刚刚创建的一行记录的id取出来更新到对象中
print(clazz)

course = Course(name="数学")
session.add(course)
session.flush()
course = session.query(Course).filter(Course.name=="数学").first() # 把刚插入的记录查询出来
print(course)

student = Student(stu_no="007", name="张三", clazz_id=clazz.id)
student.courses.append(course) # 测试backref给student加了courses属性,并且可以通过其维护关系表
session.add(course)
session.flush()

session.refresh(course)
print(course.students)
print(to_jsonable(course)) #to_jsonable有students属性需要覆盖keys方法来支持

student.name = "李四"
session.flush() # 这里相当于执行了update


# 执行原生SQL查询
r = session.execute("select id, name from student").fetchall()
for e in r:
print(e) # 可以看到已经变成了李四



if __name__ == '__main__':
main()

使用sqlacodegen对已有数据库生成模型定义代码

通过pip安装

1
pip install sqlacodegen

命令格式 就是 sqlacodegen 后面跟初始化engine的url

1
sqlacodegen <sqldialect>[+<sqldriver>]://<username>:<password>@<server>/<dbname>

执行命令会在控制台输出生成的代码,可以重定向到.py

示例

1
sqlacodegen mysql+pymysql://mzdai:123456@192.168.1.140/mzdai > db_mzdai.py

然后可以修改生成的代码,例如导入db_中的一些内容,然后替换模型的基类 declarative_base(cls=ModelBase)

临时修改,重启失效

1
hostname xxx

永久修改,重启生效

1
vim /etc/hostname

域名解析文件一并修改

1
vim /etc/hosts

默认的源在国外,下载软件速度较慢,可以替换为国内的源,比如阿里的,替换 /etc/apt/sources.list文件为

1
2
3
4
5
6
7
8
9
10
11
12
# 阿里云镜像
deb http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
# 预发布版软件,可以不启用
deb http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse

目前3个主流版本的代号分别是

18.04 LTS bionic

20.04 LTS focal

22.04 LTS jammy

其他版本则把其中的bionic替换成其他版本的代号。

可以用tee命令直接修改文件,而不用vim或nano打开,以20.04 LTS,不带预发布版软件为例

1
2
3
4
5
6
7
8
9
10
tee /etc/apt/sources.list <<-'EOF'
deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
EOF

更新软件包

1
apt-get update

如果提示 NO_PUBKEY 3B4FE6ACC0B21F32

则执行

1
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32

Python的线程没有提供从外部主动终止线程的方法,当然从程序设计上应该避免这种方式,但有些时候我们希望把线程超时当做一种异常来处理,我们需要提前终止它,下面的代码提供了一种可参考方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import inspect
import ctypes

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

def kill_thread(tid):
_async_raise(tid, SystemExit)

实际测试下来大多数情况是可以工作的,除了线程长时间陷在time.sleep的情况,也就是说上面的代码无法把一个线程从sleep状态唤醒并终止。