0%

查看系统进程运行状态,包括服务

1
systemctl status

所有可用的单元文件存放在 /lib/systemd/system/ 和 /etc/systemd/system/ 目录。

根据我的实验情况是我们应该在/lib/systemd/system/ 下存放.service文件,当设置了自启动后,会自动在 /etc/systemd/system/ 下创建一个软链接指向 /lib/systemd/system/ 下的文件。

查看所有已安装服务:

1
systemctl list-units --type=service

通过服务状态可以查看启动服务的.service配置文件

例如

1
service mongodb status

可以看到

1
/lib/systemd/system/mongodb.service

最重要的,运行命令,

1
ExecStart=/usr/bin/mongod --unixSocketPrefix=${SOCKETPATH} --config ${CONF} $DAEMON_OPTS

PS:要注意的是ExecStart指定的是一个阻塞的程序,不需要后台执行,如果不阻塞,服务会认为程序执行完了,认为服务不在启动状态。

以Kafka为例

1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description=Kafka Server
After=network.target zookeeper.service

[Service]
Type=simple
ExecStart=/opt/kafka_2.12-2.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-2.3.1/config/server.properties
Restart=on-failure
RestartPreventExitStatus=255

[Install]
WantedBy=multi-user.target
Alias=kafka.service

详细的.service文件编写方法可以参考 http://www.jinbuguo.com/systemd/systemd.service.html

修改服务配置文件后需要

1
systemctl daemon-reload

设置服务开机自启动

1
systemctl enable postgresql.service

查询是否自启动服务

1
systemctl is-enabled postgresql.service

取消服务器开机自启动

1
systemctl disable postgresql.service

Alembic需要SQLAlchemy支持,如果项目是使用Python基于SQLAlchemy的开发的,那么可以用Alembic管理数据库版本变化。

Alembic将版本间的一组变化称为一个迁移,将变化过程称为迁移。我们可以说从一个版本迁移到另一个版本。

我们先新建一个目录alembic

初始化Alembic

1
alembic init <YOUR_ALEMBIC_DIR>

例如

alembic init alembic_foo,foo可以是项目的数据库名

有两个文件需要修改

alembic.ini alembic_foo/env.py

alembic.ini 中修改sqlalchemy.url

例如 sqlalchemy.url = postgresql+psycopg2://用户名:密码@ip_address/dbname

env.py 修改 # target_metadata = mymodel.Base.metadata

修改把项目中用来定义数据模型基类的Base import进来,改成 target_metadata = Base.metadata,以便Alembic知道项目中有哪些表,表结构是什么

自动生成迁移(生成初始迁移)

1
alembic revision --autogenerate -m <log_message>

我都是用日期和时间作为log_message,例如

1
alembic revision --autogenerate -m "%date:~0,4%%date:~5,2%%date:~8,2%%time:~0,2%%time:~3,2%%time:~6,2%"

自动生成迁移的时候,Alembic会对比数据库现有的表结构和配置的Base.metadata对应的表结构,生成差异转换代码,我们首次生成的时候一般两边是一致的,所以生成的迁移脚本其实是什么都不做。

执行上面的命令可以看到会生成 alembic_foo/versions/xxxx.py ,其中xxx就是revision+log_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""20200326 91627

Revision ID: bab1fb444e93
Revises:
Create Date: 2020-03-26 09:16:31.264071

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bab1fb444e93'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

把数据库升级到最新版本

1
alembic upgrade head

其实会做一遍upgrade中的命令,现在为空就什么都不做。

其实alembic做了一件事,在数据库中创建了一张名为alembic_version的表,里面只有一行一列记录着当前的数据库版本

以后要修改数据库模型时的步骤如下:

(1)修改SQLAlchemy Base那边的代码,即通过ORM框架定义的表示数据表结构的Python类。

(2)执行 alembic revision --autogenerate ... 命令生成迁移脚本。

(3)检查自动生成的脚本,改成不准确的地方。(例如重命名会变成删除再添加,会丢失数据)

(4)执行 alembic upgrade head 把改动应用到数据库

降级或升级数据库

升级到最新前面已经说过了

1
alembic upgrade head

要指定版本的话,看到前面自动生成的py文件里面有个Revision ID,同时也是py文件的前缀。

升级到指定版本

1
alembic upgrade <Revision ID>

降级到指定版本

1
alembic downgrade <Revision ID>

参考:

https://alembic.sqlalchemy.org/en/latest/tutorial.html

https://www.cnblogs.com/blackmatrix/p/6236573.html

监视主线程卡死,卡死时保错再主动崩溃退出,同时打印当时的调用堆栈。
还可以再外部结合监视崩溃自动重启就可以在卡死时实现自动重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import traceback
import time
import sys
import os
from functools import wraps

class WatchDog(threading.Thread):

def __init__(self, timeout=10, echo=False):
super(WatchDog, self).__init__()
self.timeout = timeout
self.echo = echo
self.last_kicked_ts = time.time()
self.lock = threading.Lock()
self.thread_id = threading.currentThread().ident
self.terminated = False
self.setDaemon(True)
self.start()

def terminate(self):
self.terminated = True
self.join(self.timeout)

def kick(self):
self.lock.acquire()
self.last_kicked_ts = time.time()
self.lock.release()

def bark(self):
formated_frame_stack = self._get_formated_frame_stack()
if self.echo:
print("!!!!! WATCH DOG FAILURE TRIGGERED !!!!!\n" + formated_frame_stack, flush=True)
pid = os.getpid()
os.kill(pid, 2) # 通知进程退出
time.sleep(5) # 等待5秒
os.kill(pid, 9) # 发送强制退出

def run(self):
while not self.terminated:
ts = time.time()
self.lock.acquire()
is_timeout = ts - self.last_kicked_ts > self.timeout
self.lock.release()
if is_timeout:
self.bark()
n = int(max(self.timeout / 3, 1))
for i in range(n*10):
time.sleep(0.1)
if self.terminated:
break

@staticmethod
def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None

@staticmethod
def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None

def _get_formated_frame_stack(self):
info = []
th = self._get_thread(self.thread_id)
stack = self._get_frame_stack(self.thread_id)
info.append('%s thead_id=%d' % (th.name, self.thread_id))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def watch_dog(timeout=10, echo=False):
def inner(func):
def wrapper(*args, **kw):
dog = WatchDog(timeout=timeout, echo=echo)
ret = func(*args, **kw)
dog.terminate()
return ret
return wrapper
return inner

用例1,监控函数超时

1
2
3
4
5
6
7
8
9
10
@watch_dog(timeout=3, echo=True)
def func():
# do something
time.sleep(5)

def main():
func()

if __name__ == '__main__':
main()

我们监视func,设置其3秒超时,然后在里面sleep 5秒来引起超时,可以看到输出如下

1
2
3
4
5
6
7
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=2164
at main()(watch_dog.py:97)
at func()(watch_dog.py:94)
at ret = func(*args, **kw)(watch_dog.py:81)
at time.sleep(5)(watch_dog.py:91)
[Finished in 3.5s with exit code 2]

在程序启动3.5秒后就主动崩溃了

用例2,监控循环超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main():
watch_dog = WatchDog(timeout=3, echo=True)

# main loop
# while True:
for i in range(0, 10):
time.sleep(i)
print(f"i={i}")
watch_dog.kick()
watch_dog.terminated()


if __name__ == '__main__':
main()

输出如下

1
2
3
4
5
6
7
8
9
i=0
i=1
i=2
i=3
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=9060
at main()(watch_dog.py:101)
at time.sleep(i)(watch_dog.py:94)
[Finished in 10.1s with exit code 2]

用例3,超时自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing

def main():
...

if __name__ == '__main__':
RETRY_TIMES = 5
for i in range(RETRY_TIMES): # 重试次数
child_process = multiprocessing.Process(target=main)
child_process.start()
child_process.join()

if child_process.exitcode == 0:
print("子进程正常结束")
exit(0)
elif i < RETRY_TIMES - 1:
print("子进程异常结束,即将重试")
else:
print("子进程异常结束,重试超次")
exit(child_process.exitcode)

通过

1
tzselect

修改时区

根据屏幕提示选择选到 Asia/Shanghai 我这里是按4-9-1

然后把

1
TZ='Asia/Shanghai'; export TZ

加入到 .profile 中保证重启后有效

修改 /etc/localtime 链接到的文件

1
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

修改 /etc/timezone 的文件内容

1
echo Asia/Shanghai > /etc/timezone

如果在第一步tzselect就报错,需要先安装tzdata,也可以用下面的命令一步到位

1
2
3
4
5
6
7
export TZ=Asia/Shanghai \
DEBIAN_FRONTEND=noninteractive
apt update -yqq \
&& apt install -yqq tzdata \
&& ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo ${TZ} > /etc/timezone \
&& dpkg-reconfigure --frontend noninteractive tzdata

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")

SQL Server(version 2005)读取数据库中的表信息

1
2
3
4
5
6
select
name as 'table_name',
create_date AS 'create_time',
modify_date AS 'update_time'
from sys.tables
where type = 'U'

SQL Server(version 2005)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT
a.name as 'column_name',
b.name as 'data_type',
COLUMNPROPERTY(a.id,a.name,'PRECISION') as 'data_length',
COLUMNPROPERTY(a.id,a.name,'Scale') as 'data_scale',
case when exists(SELECT 1 FROM sysobjects where xtype='PK' and parent_obj=a.id and name in (
SELECT name FROM sysindexes WHERE indid in( SELECT indid FROM sysindexkeys WHERE id = a.id AND colid=a.colid))) then 1 else 0 end as 'primary_key',
COLUMNPROPERTY(a.id,a.name,'IsIdentity') as 'autoincrement',
a.isnullable as 'nullable',
e.text as 'column_default',
g.value as 'column_comment'
FROM syscolumns a
left join systypes b on a.xusertype=b.xusertype
left join sysobjects d on a.id=d.id and d.xtype='U'
left join syscomments e on a.cdefault=e.id
left join sys.extended_properties g on a.id=g.major_id and a.colid=g.minor_id
where d.name='Role'
order by a.id, a.colorder

查询结果示例如下:

column_name data_type data_length data_scale primary_key autoincrement nullable column_default column_comment
id int 10 0 1 1 0
name nvarchar 50 0 0 0 名称
description nvarchar 300 0 0 1 描述

MySQL(version 5.7)读取数据库中的表信息

1
2
3
4
5
6
7
8
SELECT
TABLE_NAME AS 'table_name',
TABLE_ROWS AS 'table_rows',
CREATE_TIME AS 'create_time',
UPDATE_TIME AS 'update_time'
FROM
information_schema.TABLES
WHERE TABLE_SCHEMA ='test'

其中TABLE_SCHEMA是数据库名。
查询结果示例如下:

table_name table_rows create_time update_time
goods 5 2020/1/15 17:10 2020/1/15 17:10

其中update_time是表结构的更新时间,而不是表数据的更新时间,而且我测试下来create_time也会跟着变化不知为何。

MySQL(version 5.7)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT
COLUMN_NAME as 'column_name',
DATA_TYPE as 'data_type',
IFNULL(CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION) as 'data_length',
NUMERIC_SCALE as 'data_scale',
COLUMN_TYPE as 'column_type',
IF(COLUMN_KEY='PRI', 1, 0) as 'primary_key',
IF(EXTRA='auto_increment', 1, 0) as 'autoincrement',
IF(IS_NULLABLE='YES', 1, 0) as 'nullable',
COLUMN_DEFAULT as 'column_default',
COLUMN_COMMENT as 'column_comment'
FROM
information_schema.COLUMNS
WHERE TABLE_SCHEMA ='test' and TABLE_NAME = 'goods'

其中TABLE_SCHEMA是数据库名,TABLE_NAME是表名。
查询结果示例如下:

column_name data_type data_length data_scale column_type primary_key autoincrement nullable column_default column_comment
id int 10 0 int(11) 1 1 0
name varchar 100 varchar(100) 0 0 1 名称
price decimal 18 2 decimal(18,2) 0 0 0 0 价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
from redis import Redis
import threading
import time
import uuid

# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)

def acquire(self, blocking=True, ex=120):
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
time.sleep(0.01)
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
return r

def release(self):
if self.acquired():
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
return r != None and r.decode() == str(self.id)

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value != None:
raise exc_value


# 读写锁
class ReadWriteLock:
MAX_READ_TIME = 120 # 最长加读锁时间(秒)
'''
正常不会用到MAX_READ_TIME,在加锁后异常并且没有对解锁进行正确处理时,会导致锁始终被一个已崩溃的进程持有无法释放,
为了应对这种情况,通过设置MAX_READ_TIME,在加读锁时间超过MAX_READ_TIME没有解锁时自动解锁
'''

def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.meta_lock_name = "META_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)
self.lock_type = None

def acquire_read_lock(self, blocking=True):
# 这个mutex用来维护读写锁内部数据本身使用,在离开ReadWriteLock函数前必须释放
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-self.MAX_READ_TIME)*1000) )
# 添加新的读锁
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, self.MAX_READ_TIME)
pipeline.execute()
self.lock_type = 'R'
return True
finally:
mutex.release()

def acquire_write_lock(self, blocking=True, ex=120):
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
else:
return False
# 尝试获取写锁
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if r:
self.lock_type = 'W'
return r
finally:
mutex.release()

def acquired(self, lock_type = "any"):
return self.lock_type is not None if lock_type == "any" else self.lock_type == lock_type

def release(self):
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r != None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None

支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是超时后自动结束。
考虑了未崩溃但是超时(这种情况首先应该调整超时时间设置,或者程序应调整锁住的代码,例如多次分段锁)。
在这种异常情况发生时,可能产生一边释放了锁但还在访问,另一边加上了锁,记住这是异常情况,但我们要保证即使它发生了也尽量能正常工作下去,
对于确实存在访问冲突的那么是没办法的,该异常就异常好了,还有种情况是虽然加了锁,但是并没有访问冲突,其实程序可以正常下去,但是这里会发生什么呢?

1
2
3
4
对于A线程,手动加锁----------超时自动解锁|------------------------手动解锁|
对于B线程,-------------手动加锁-------|(等到此加锁成功)---------------------手动解锁|
对于C线程,-------------------------------|------------手动加锁-------|(如果在此加锁成功是错误的!)

A线程因为超时自动解锁后虽然和B线程没有发生访问冲突,但是它解了B线程的锁,导致C线程加锁成功,而B线程实际还没解锁,这又制造了潜在的B线程和C线程的访问冲突。
所以手动解锁时应该判断下当前的锁是否是自己加的。这就是acquired函数中r.decode() == str(self.id)存在的意义。

我们基于Mutex实现了一个ReadWriteLock,可以看到在Mutex加锁时最长加锁时间ex是在加锁时传入的,而ReadWriteLock的最长加读锁时间则是其类成员变量,而最长加写锁时间是每次传入的,这是因为对于Mutex来说每次加锁的时间可以是不同的,而ReadWriteLock加读锁需要依赖

1
redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )

来释放因异常而过期的锁,这时的过期时间相当于是对所有之前加过的读锁而言的,所以比较简单的方式就是统一加读锁的最长时间。

封装基类

模型基类,管理engine,事务处理,JSON序列化 的代码

通用代码 db_.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# -*- coding: utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from urllib import parse
import re
import datetime
import types
import decimal


class ModelBase(object):

@declared_attr
def __tablename__(cls):
# return cls.__name__.lower()
return re.sub(r'([A-Z])', r'_\1', cls.__name__[0].lower()+cls.__name__[1:]).lower()

@classmethod
def props(cls):
if cls.__base__.__name__ == "Base":
return [c for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c for c in cls.__base__.__table__.columns]
column_names = [c for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的props获取暂未实现")

@classmethod
def prop_names(cls):
if cls.__base__.__name__ == "Base":
return [c.name for c in cls.__table__.columns]
elif cls.__base__.__base__.__name__ == "Base":
super_column_names = [c.name for c in cls.__base__.__table__.columns]
column_names = [c.name for c in cls.__table__.columns if c.name != 'id']
return super_column_names + column_names
else:
assert(False and "多层继承后的prop_names获取暂未实现")

@classmethod
def prop(cls, prop_name):
return cls.__table__.columns[prop_name]

# 对象在输出时如何序列化成字符串
def __repr__(self):
attrs = []
# for c in self.__table__.columns:
for name in self.prop_names():
attr = getattr(self, name)
if type(attr) in (str, datetime.date, datetime.time, datetime.datetime):
attrs.append(f"{name}='{attr}'")
else:
attrs.append(f"{name}={attr}")
return f"{self.__class__.__name__}({', '.join(attrs)})"

# 转换成dict时的键列表
def keys(self):
return self.prop_names()

def __getitem__(self, item):
return getattr(self, item)

def __setitem__(self, item, value):
return setattr(self, item, value)

def to_dict(self):
return {name: to_jsonable(getattr(self, name)) for name in self.keys()}


def to_dict(db_obj):
if isinstance(db_obj, ModelBase):
return db_obj.to_dict()
else:
return db_obj

def to_list(db_objs):
return [to_dict(db_obj) for db_obj in db_objs]


# Base = declarative_base(cls=ModelBase)


# 把ORM对象转成可序列化成JSON的对象,对于ORM对象的list转换为dict的list,对于ORM对象转换成dict
def to_jsonable(o):
if isinstance(o, list):
return [to_jsonable(e) for e in o]
if isinstance(o, dict):
return {k: to_jsonable(v) for (k,v) in o.items()}
if isinstance(o, ModelBase):
return o.to_dict()
if isinstance(o, datetime.datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, datetime.date):
return o.strftime('%Y-%m-%d')
if isinstance(o, datetime.time):
return o.strftime('%H:%M:%S')
if isinstance(o, decimal.Decimal):
return float(o)
return o


engines = {}

r"""
不同数据库URL样例
r"sqlite:///:memory:" sqlite内存数据库
r"sqlite:///C:\path\foo.db" sqlite在windows上
r"sqlite:////root/data/foo.db" sqlite在linux上
r"mysql+pymysql://username:password@server/dbname" MySQL
r"postgresql+psycopg2://username:password@server/dbname" Postgresql
r"mssql+pymssql://username:password@server/dbname?charset=utf8" Microsoft SQL Server
"""
def init_engine(
url=None, name="main",
dialect=None, username="", password="", server="", dbname=None,
**kwargs):
if dialect:
if dialect.lower() in ("mysql", ):
dialect = "mysql+pymysql"
elif dialect.lower() in ("postgresql", "postgres", "pgsql"):
dialect = "postgresql+psycopg2"
elif dialect.lower() in ("mssql", "sqlserver"):
dialect = "mssql+pymssql"
if url is None:
if password:
url = '{}://{}:{}@{}/{}'.format(dialect, username, parse.quote_plus(password), server, dbname)
elif username:
url = '{}://{}@{}/{}'.format(dialect, username, server, dbname)
else:
url = '{}://{}/{}'.format(dialect, server, dbname)

# # 设置连接池默认大小
# if "pool_size" not in kwargs:
# kwargs["pool_size"] = 5

# # 设置连接默认回收时间(MySQL默认8小时)
# if "pool_recycle" not in kwargs:
# kwargs["pool_recycle"] = 28000

# 默认启用pool_pre_ping
if "pool_pre_ping" not in kwargs:
kwargs["pool_pre_ping"] = True

engine = create_engine(url, **kwargs)
session_maker = sessionmaker(expire_on_commit=False)
session_maker.configure(bind=engine)

@contextmanager
def _session_scope(maker):
session = maker()
mutexes = []
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
engine.session_scope = types.MethodType(_session_scope, session_maker)
engines[name] = engine
return engine


def exit_engine(name="main"):
if name in engines:
del engines[name]


def get_engine(name="main"):
if name in engines:
return engines[name]
else:
return None


def session_scope(name="main"):
if name in engines:
return engines[name].session_scope()
else:
raise Exception("engine未初始化")

数据库模型(表)定义

db_test.py 基于db_.py来定义数据库和使用数据库,包括如何定义关系映射,如何定义外键,唯一键,索引,如何将查询结果转换成可JSON序列化的对象(以便通过RESTful API返回)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
from sqlalchemy import Table, Column, Integer, Float, Numeric, String, DateTime, Date, Time, Boolean
from sqlalchemy import PrimaryKeyConstraint, ForeignKey, Index, UniqueConstraint
from sqlalchemy import text, func, and_, or_, not_, asc, inspect, desc, distinct
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base

from utils.db_ import ModelBase, init_engine, exit_engine, get_engine, session_scope, to_jsonable

Base = declarative_base(cls=ModelBase)


class Clazz(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
enroll_year = Column(Integer(), comment="入学年份")


class Student(Base):
__table_args__ = (
UniqueConstraint('clazz_id', 'name'), # 创建多字段唯一约束
Index('ix_clazz_id_stu_no', 'clazz_id', 'stu_no'), # 创建多字段索引
)
id = Column(Integer(), primary_key=True, autoincrement=True) # 自增ID作为单字段主键,多字段主键也可以在多字段上都写上primary_key=True
stu_no = Column(String(), index=True, unique=True, comment="学号") # 创建单字段索引,创建单字段唯一约束
name = Column(String(), nullable=True, comment="姓名") # 创建非空约束
clazz_id = Column(Integer(), ForeignKey('clazz.id', ondelete='RESTRICT'), nullable=True, comment="班级ID") # 创建外键

clazz = relationship('Clazz') # 多对一(一对多)关系映射


class Course(Base):
id = Column(Integer(), primary_key=True, autoincrement=True)
name = Column(String(), comment="课程名")

students = relationship('Student', secondary='student_course', backref=backref('courses')) # 多对多映射关系,backref相当于给Student对象加了courses属性

def keys(self):
return super().keys() + ["students",]


# 选课(多对多关系表)
student_course = Table('student_course', Base.metadata,
Column('student_id', Integer(), ForeignKey('student.id'), primary_key=True),
Column('course_id', Integer(), ForeignKey('course.id'), primary_key=True))


def main():
engine = init_engine(url=r"sqlite:///:memory:")
Base.metadata.create_all(engine) # 根据定义创建出所有表
with session_scope() as session:
clazz = Clazz(enroll_year=2022) # 创建一个clazz对象
session.add(clazz) # 用clazz对象创建一行记录
session.flush() # 立即执行
session.refresh(clazz) # 更新clazz,把刚刚创建的一行记录的id取出来更新到对象中
print(clazz)

course = Course(name="数学")
session.add(course)
session.flush()
course = session.query(Course).filter(Course.name=="数学").first() # 把刚插入的记录查询出来
print(course)

student = Student(stu_no="007", name="张三", clazz_id=clazz.id)
student.courses.append(course) # 测试backref给student加了courses属性,并且可以通过其维护关系表
session.add(course)
session.flush()

session.refresh(course)
print(course.students)
print(to_jsonable(course)) #to_jsonable有students属性需要覆盖keys方法来支持

student.name = "李四"
session.flush() # 这里相当于执行了update


# 执行原生SQL查询
r = session.execute("select id, name from student").fetchall()
for e in r:
print(e) # 可以看到已经变成了李四



if __name__ == '__main__':
main()

使用sqlacodegen对已有数据库生成模型定义代码

通过pip安装

1
pip install sqlacodegen

命令格式 就是 sqlacodegen 后面跟初始化engine的url

1
sqlacodegen <sqldialect>[+<sqldriver>]://<username>:<password>@<server>/<dbname>

执行命令会在控制台输出生成的代码,可以重定向到.py

示例

1
sqlacodegen mysql+pymysql://mzdai:123456@192.168.1.140/mzdai > db_mzdai.py

然后可以修改生成的代码,例如导入db_中的一些内容,然后替换模型的基类 declarative_base(cls=ModelBase)

临时修改,重启失效

1
hostname xxx

永久修改,重启生效

1
vim /etc/hostname

域名解析文件一并修改

1
vim /etc/hosts

默认的源在国外,下载软件速度较慢,可以替换为国内的源,比如阿里的,替换 /etc/apt/sources.list文件为

1
2
3
4
5
6
7
8
9
10
11
12
# 阿里云镜像
deb http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-backports main restricted universe multiverse
# 预发布版软件,可以不启用
deb http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ bionic-proposed main restricted universe multiverse

目前3个主流版本的代号分别是

18.04 LTS bionic

20.04 LTS focal

22.04 LTS jammy

其他版本则把其中的bionic替换成其他版本的代号。

可以用tee命令直接修改文件,而不用vim或nano打开,以20.04 LTS,不带预发布版软件为例

1
2
3
4
5
6
7
8
9
10
tee /etc/apt/sources.list <<-'EOF'
deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
EOF

更新软件包

1
apt-get update

如果提示 NO_PUBKEY 3B4FE6ACC0B21F32

则执行

1
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32