0%

服务器其他用户反馈读取数据库很卡。

通过 sar -d -p 3 命令发现硬盘占用率比较高

通过 iotop 命令发现主要是被一个名为 [jbd2/sda2-8] 的进程占用

网上说法是 (13条消息) 性能分析之IO分析-jbd2引起的IO高_hualusiyu的专栏-CSDN博客

1
jbd2的全称是journaling block driver 。这个进程实现的是文件系统的日志功能,磁盘使用日志功能来保证数据的完整性。这个需要评估一下安全和性能哪个更重要,解决方案是升级内核或者牺牲完整性来换性能。

差点被误导。

而使用命令 atop -d 发现其实是 snapd 占用

和这篇帖子情况一样 snapd持续运行,引起jbd2/sda2-8持续访问硬盘,占用大量io - Ubuntu中文论坛

1
2
3
[#6](https://forum.ubuntu.org.cn/viewtopic.php?p=3221983#p3221983)
帖子 由 sffred » 2020-06-06 16:04
我最终解决这个问题的方式是卸载snapd。反正我也用不着

snapd是ubuntu预装的一个软件包管理工具。

使用 snap list 发现只有一个core,也就是我没有基于snap安装过软件包。

通过 service snapd stop 关闭snapd,再通过 sar -d -p 3 观察硬盘占用,已经完全正常

至此确定是由snapd引发.

通过service snapd start 启动snapd,观察硬盘占用,先是再次上升数十秒后回到了正常。

如果下次再出现占用过高准备禁用或卸载snap。

禁用 systemctl disable snapd.service 卸载 apt purge snapd

相关命令

持续观察硬盘读写情况,每3秒刷新一次

1
sar -d -p 3

sar常用的的参数还有监控CPU情况的

1
sar -u 3

按IO从高到低排序监控进程,实时刷新

1
iotop

也能按IO从高到低排序,实时刷新,感觉比iotop好用

1
atop -d

atop也有监控CPU情况的

1
atop -u

在需要被挂载的服务器上安装nfs-kernel-server

1
sudo apt install nfs-kernel-server

/etc/exports

通过此文件配置共享目录,样例

1
2
3
4
5
6
7
8
9
10
# /etc/exports: the access control list for filesystems which may be exported
# to NFS clients. See exports(5).
#
# Example for NFSv2 and NFSv3:
# /srv/homes hostname1(rw,sync,no_subtree_check) hostname2(ro,sync,no_subtree_check)
#
# Example for NFSv4:
# /srv/nfs4 gss/krb5i(rw,sync,fsid=0,crossmnt,no_subtree_check)
# /srv/nfs4/homes gss/krb5i(rw,sync,no_subtree_check)
/data/share 192.168.1.157(insecure,rw,sync,all_squash,no_subtree_check) *(insecure,ro,async,all_squash,no_subtree_check)

这里配置了/data/share为共享目录,允许从192.168.1.157进行读写操作,允许从其他任意IP进行只读操作。

把共享目录的owner改成 nobody:nogroup

1
sudo chown nobody:nogroup /data/share

设置共享目录的权限

1
sudo chmod 777 /data/share

重新加载配置文件

1
sudo exportfs -a

在需要挂载的服务器需要安装nfs-common

1
apt install nfs-common

挂载命令样例 ,把140上的/data/share共享目录挂载到本地/mnt/140share

1
mount 192.168.1.140:/data/share /mnt/140share

端口

使用到的端口,如果有防火墙需要设置

111/tcp+udp

2049/tcp

不同版本的NFS服务器也可能需要更多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Data ONTAP:
111 TCP/UDP portmapper
2049 TCP/UDP nfsd
635 TCP/UDP mountd
4045 TCP/UDP nlockmgr
4046 TCP/UDP status
4049 TCP/UDP rquotad

Data ONTAP 7-Mode:
111 TCP/UDP portmapper
2049 TCP/UDP nfsd
4046 TCP/UDP mountd
4045 TCP/UDP nlockmgr
4047 TCP/UDP status
4049 TCP/UDP rquotad

参考:Which Network File System (NFS) TCP and NFS UDP ports are used on the storage system? - NetApp Knowledge Base

另外可能用到一个随机端口,在/etc/default/nfs-kernel-server中通过

1
RPCMOUNTDOPTS="--port 33333"

可以使之固定为33333。

更改设置后重启服务

1
service nfs-kernel-server restart

如果希望系统启动时自动加载文件系统

/etc/fstab

通过此文件配置系统启动时自动挂载

样例

1
2
# <file system> <mount point>   <type>  <options>       <dump>  <pass>
192.168.1.140:/data/share /mnt/140share nfs defaults 0 0

重新加载配置文件

1
sudo mount -a

配置中type是被挂载的路径的类型

常用的类型有:

nfs表示远程linux的共享路径

cifs表示远程windows的共享路径

ext4表示本地ext4路径

配置中options是挂载选项,在挂载windows目录时,通过以下方式指定用户名和密码

1
defaults,auto,username=MZhDai,password=******,dir_mode=0777,file_mode=0777

另外挂载windows目录需要 mount.cifs 支持,如果没有可以通过以下命令安装

1
sudo apt install cifs-utils

挂载命令通过 -o指定选项

1
sudo mount.cifs //192.168.1.107/f /mnt/107f -o user=MZhDai,pass=******,dir_mode=0777,file_mode=0777

还有常用的选项如

1
2
3
4
5
ro :只读挂载
rw : 读写挂载
uid=`id -u <owner>` : 指定挂载后的所有者
gid=`id -g <group>` : 指定挂载后的所有组
iocharset=utf8 : 指定编码为uft8,解决中文乱码问题,我在WSL2挂载Windows共享遇到过一次

.cifs 可以省略,mount命令会自动识别需要挂载的路径类型

访问挂载的共享目录卡住

执行

1
ls /mnt

1
df -h

时卡住

可以使用

1
strace ls /mnt

1
strace df -h

查看命令执行的过程,看看最终卡在哪一步,比如卡在访问/mnt/abc

查看挂载参数

1
cat /proc/mounts

查看服务器支持的版本

1
nfsstat -s

卸载挂载点

1
umount <挂载点>

如果卡住试下强制卸载

1
umount -f <挂载点>

1
umount -lf <挂载点>

如果还是卸载失败可以看一下哪个进程在使用挂载点下的文件

1
lsof|grep <挂载点>

考虑杀死进程后再卸载

指定挂载使用的NFS协议版本

1
mount.nfs XXX XXX -o nfsvers=3

一般是v4版本卡住,改用v3可以解决

OSError: [Errno 37] Unable to open file

增加nolock挂载参数

1
mount.nfs XXX XXX -o nfsvers=3,nolock

主服务器:192.168.1.99

从服务器:192.168.1.150

配置主服务器

在主服务器创建repl用户

1
CREATE ROLE repl login replication password 'd71ea3'; 

配置repl用户访问权限

1
vim /etc/postgresql/10/main/pg_hba.conf
1
host    replication     repl            192.168.1.150/32               md5

配置主服务器

1
vim /etc/postgresql/10/main/postgresql.conf
1
2
3
4
5
6
7
8
9
wal_level = replica

archive_mode = on # enables archiving; off, on, or always
# (change requires restart)
archive_command = 'rsync -zaq %p postgres@192.168.1.150:/var/lib/postgresql/wal_restore/%f && test ! -f /var/lib/postgresql/archivedir/%f && cp %p /var/lib/postgresql/archivedir/%f'

max_wal_senders = 10
wal_keep_segments = 64
wal_sender_timeout = 60s

归档命令不加入rsync也可以,只需要在建立主从同步时手动把完整备份之后的归档复制到从库,后面配置从库时候会提到。

我实际使用的归档命令还加入了自动删除旧数据

1
archive_command = 'DIR=/var/lib/postgresql/archivedir; test ! -f $DIR/%f && cp --preserve=timestamps %p $DIR/%f; find $DIR -type f -mtime +31|xargs rm -f'

重启服务

1
service postgresql restart

配置从服务器

停止服务

1
service postgresql stop

删除所有数据

1
2
3
cd /var/lib/postgresql/10/main

rm -rf *

配置从服务器

1
vim /etc/postgresql/10/main/postgresql.conf
1
hot_standby = on

切换到postgres用户

1
sudo su - postgres

从服务器上从主服务器创建初始备份,上面切换用户是为了不用调整文件权限

1
pg_basebackup -h 192.168.1.99 -U repl -D /var/lib/postgresql/10/main -F p -X stream -P -R -p 5432

Password: d71ea3

会自动生成 recovery.conf 启动之后会读取里面的配置进行主从同步

编辑recovery.conf 在结尾追加

1
2
restore_command = 'cp /var/lib/postgresql/wal_restore/%f %p'
archive_cleanup_command = 'pg_archivecleanup /var/lib/postgresql/wal_restore/ %r'

切回root

1
sudo su -

回到主服务器,如果前面主服务器归档命令没有加入rsync,那么我们现在在主服务器上 复制最近一天内修改过的归档文件到从服务器

1
2
cd /var/lib/postgresql/archivedir
find -mtime -1|xargs -n 1 -I{} scp /var/lib/postgresql/archivedir/{} postgres@192.168.1.150:/var/lib/postgresql/wal_restore/

相当于整体上从库建立起同步需要的数据 = 完整备份 + 归档文件 + WAL缓存

从服务器启动服务

1
service postgresql start

从主服务器查看从服务器同步状态

1
select application_name, sync_state from pg_stat_replication;

查看系统进程运行状态,包括服务

1
systemctl status

所有可用的单元文件存放在 /lib/systemd/system/ 和 /etc/systemd/system/ 目录。

根据我的实验情况是我们应该在/lib/systemd/system/ 下存放.service文件,当设置了自启动后,会自动在 /etc/systemd/system/ 下创建一个软链接指向 /lib/systemd/system/ 下的文件。

查看所有已安装服务:

1
systemctl list-units --type=service

通过服务状态可以查看启动服务的.service配置文件

例如

1
service mongodb status

可以看到

1
/lib/systemd/system/mongodb.service

最重要的,运行命令,

1
ExecStart=/usr/bin/mongod --unixSocketPrefix=${SOCKETPATH} --config ${CONF} $DAEMON_OPTS

PS:要注意的是ExecStart指定的是一个阻塞的程序,不需要后台执行,如果不阻塞,服务会认为程序执行完了,认为服务不在启动状态。

以Kafka为例

1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description=Kafka Server
After=network.target zookeeper.service

[Service]
Type=simple
ExecStart=/opt/kafka_2.12-2.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-2.3.1/config/server.properties
Restart=on-failure
RestartPreventExitStatus=255

[Install]
WantedBy=multi-user.target
Alias=kafka.service

详细的.service文件编写方法可以参考 http://www.jinbuguo.com/systemd/systemd.service.html

修改服务配置文件后需要

1
systemctl daemon-reload

设置服务开机自启动

1
systemctl enable postgresql.service

查询是否自启动服务

1
systemctl is-enabled postgresql.service

取消服务器开机自启动

1
systemctl disable postgresql.service

Alembic需要SQLAlchemy支持,如果项目是使用Python基于SQLAlchemy的开发的,那么可以用Alembic管理数据库版本变化。

Alembic将版本间的一组变化称为一个迁移,将变化过程称为迁移。我们可以说从一个版本迁移到另一个版本。

我们先新建一个目录alembic

初始化Alembic

1
alembic init <YOUR_ALEMBIC_DIR>

例如

alembic init alembic_foo,foo可以是项目的数据库名

有两个文件需要修改

alembic.ini alembic_foo/env.py

alembic.ini 中修改sqlalchemy.url

例如 sqlalchemy.url = postgresql+psycopg2://用户名:密码@ip_address/dbname

env.py 修改 # target_metadata = mymodel.Base.metadata

修改把项目中用来定义数据模型基类的Base import进来,改成 target_metadata = Base.metadata,以便Alembic知道项目中有哪些表,表结构是什么

自动生成迁移(生成初始迁移)

1
alembic revision --autogenerate -m <log_message>

我都是用日期和时间作为log_message,例如

1
alembic revision --autogenerate -m "%date:~0,4%%date:~5,2%%date:~8,2%%time:~0,2%%time:~3,2%%time:~6,2%"

自动生成迁移的时候,Alembic会对比数据库现有的表结构和配置的Base.metadata对应的表结构,生成差异转换代码,我们首次生成的时候一般两边是一致的,所以生成的迁移脚本其实是什么都不做。

执行上面的命令可以看到会生成 alembic_foo/versions/xxxx.py ,其中xxx就是revision+log_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""20200326 91627

Revision ID: bab1fb444e93
Revises:
Create Date: 2020-03-26 09:16:31.264071

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bab1fb444e93'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

把数据库升级到最新版本

1
alembic upgrade head

其实会做一遍upgrade中的命令,现在为空就什么都不做。

其实alembic做了一件事,在数据库中创建了一张名为alembic_version的表,里面只有一行一列记录着当前的数据库版本

以后要修改数据库模型时的步骤如下:

(1)修改SQLAlchemy Base那边的代码,即通过ORM框架定义的表示数据表结构的Python类。

(2)执行 alembic revision --autogenerate ... 命令生成迁移脚本。

(3)检查自动生成的脚本,改成不准确的地方。(例如重命名会变成删除再添加,会丢失数据)

(4)执行 alembic upgrade head 把改动应用到数据库

降级或升级数据库

升级到最新前面已经说过了

1
alembic upgrade head

要指定版本的话,看到前面自动生成的py文件里面有个Revision ID,同时也是py文件的前缀。

升级到指定版本

1
alembic upgrade <Revision ID>

降级到指定版本

1
alembic downgrade <Revision ID>

参考:

https://alembic.sqlalchemy.org/en/latest/tutorial.html

https://www.cnblogs.com/blackmatrix/p/6236573.html

监视主线程卡死,卡死时保错再主动崩溃退出,同时打印当时的调用堆栈。
还可以再外部结合监视崩溃自动重启就可以在卡死时实现自动重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import traceback
import time
import sys
import os
from functools import wraps

class WatchDog(threading.Thread):

def __init__(self, timeout=10, echo=False):
super(WatchDog, self).__init__()
self.timeout = timeout
self.echo = echo
self.last_kicked_ts = time.time()
self.lock = threading.Lock()
self.thread_id = threading.currentThread().ident
self.terminated = False
self.setDaemon(True)
self.start()

def terminate(self):
self.terminated = True
self.join(self.timeout)

def kick(self):
self.lock.acquire()
self.last_kicked_ts = time.time()
self.lock.release()

def bark(self):
formated_frame_stack = self._get_formated_frame_stack()
if self.echo:
print("!!!!! WATCH DOG FAILURE TRIGGERED !!!!!\n" + formated_frame_stack, flush=True)
pid = os.getpid()
os.kill(pid, 2) # 通知进程退出
time.sleep(5) # 等待5秒
os.kill(pid, 9) # 发送强制退出

def run(self):
while not self.terminated:
ts = time.time()
self.lock.acquire()
is_timeout = ts - self.last_kicked_ts > self.timeout
self.lock.release()
if is_timeout:
self.bark()
n = int(max(self.timeout / 3, 1))
for i in range(n*10):
time.sleep(0.1)
if self.terminated:
break

@staticmethod
def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None

@staticmethod
def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None

def _get_formated_frame_stack(self):
info = []
th = self._get_thread(self.thread_id)
stack = self._get_frame_stack(self.thread_id)
info.append('%s thead_id=%d' % (th.name, self.thread_id))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def watch_dog(timeout=10, echo=False):
def inner(func):
def wrapper(*args, **kw):
dog = WatchDog(timeout=timeout, echo=echo)
ret = func(*args, **kw)
dog.terminate()
return ret
return wrapper
return inner

用例1,监控函数超时

1
2
3
4
5
6
7
8
9
10
@watch_dog(timeout=3, echo=True)
def func():
# do something
time.sleep(5)

def main():
func()

if __name__ == '__main__':
main()

我们监视func,设置其3秒超时,然后在里面sleep 5秒来引起超时,可以看到输出如下

1
2
3
4
5
6
7
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=2164
at main()(watch_dog.py:97)
at func()(watch_dog.py:94)
at ret = func(*args, **kw)(watch_dog.py:81)
at time.sleep(5)(watch_dog.py:91)
[Finished in 3.5s with exit code 2]

在程序启动3.5秒后就主动崩溃了

用例2,监控循环超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main():
watch_dog = WatchDog(timeout=3, echo=True)

# main loop
# while True:
for i in range(0, 10):
time.sleep(i)
print(f"i={i}")
watch_dog.kick()
watch_dog.terminated()


if __name__ == '__main__':
main()

输出如下

1
2
3
4
5
6
7
8
9
i=0
i=1
i=2
i=3
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=9060
at main()(watch_dog.py:101)
at time.sleep(i)(watch_dog.py:94)
[Finished in 10.1s with exit code 2]

用例3,超时自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing

def main():
...

if __name__ == '__main__':
RETRY_TIMES = 5
for i in range(RETRY_TIMES): # 重试次数
child_process = multiprocessing.Process(target=main)
child_process.start()
child_process.join()

if child_process.exitcode == 0:
print("子进程正常结束")
exit(0)
elif i < RETRY_TIMES - 1:
print("子进程异常结束,即将重试")
else:
print("子进程异常结束,重试超次")
exit(child_process.exitcode)

通过

1
tzselect

修改时区

根据屏幕提示选择选到 Asia/Shanghai 我这里是按4-9-1

然后把

1
TZ='Asia/Shanghai'; export TZ

加入到 .profile 中保证重启后有效

修改 /etc/localtime 链接到的文件

1
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

修改 /etc/timezone 的文件内容

1
echo Asia/Shanghai > /etc/timezone

如果在第一步tzselect就报错,需要先安装tzdata,也可以用下面的命令一步到位

1
2
3
4
5
6
7
export TZ=Asia/Shanghai \
DEBIAN_FRONTEND=noninteractive
apt update -yqq \
&& apt install -yqq tzdata \
&& ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo ${TZ} > /etc/timezone \
&& dpkg-reconfigure --frontend noninteractive tzdata

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")

SQL Server(version 2005)读取数据库中的表信息

1
2
3
4
5
6
select
name as 'table_name',
create_date AS 'create_time',
modify_date AS 'update_time'
from sys.tables
where type = 'U'

SQL Server(version 2005)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT
a.name as 'column_name',
b.name as 'data_type',
COLUMNPROPERTY(a.id,a.name,'PRECISION') as 'data_length',
COLUMNPROPERTY(a.id,a.name,'Scale') as 'data_scale',
case when exists(SELECT 1 FROM sysobjects where xtype='PK' and parent_obj=a.id and name in (
SELECT name FROM sysindexes WHERE indid in( SELECT indid FROM sysindexkeys WHERE id = a.id AND colid=a.colid))) then 1 else 0 end as 'primary_key',
COLUMNPROPERTY(a.id,a.name,'IsIdentity') as 'autoincrement',
a.isnullable as 'nullable',
e.text as 'column_default',
g.value as 'column_comment'
FROM syscolumns a
left join systypes b on a.xusertype=b.xusertype
left join sysobjects d on a.id=d.id and d.xtype='U'
left join syscomments e on a.cdefault=e.id
left join sys.extended_properties g on a.id=g.major_id and a.colid=g.minor_id
where d.name='Role'
order by a.id, a.colorder

查询结果示例如下:

column_name data_type data_length data_scale primary_key autoincrement nullable column_default column_comment
id int 10 0 1 1 0
name nvarchar 50 0 0 0 名称
description nvarchar 300 0 0 1 描述

MySQL(version 5.7)读取数据库中的表信息

1
2
3
4
5
6
7
8
SELECT
TABLE_NAME AS 'table_name',
TABLE_ROWS AS 'table_rows',
CREATE_TIME AS 'create_time',
UPDATE_TIME AS 'update_time'
FROM
information_schema.TABLES
WHERE TABLE_SCHEMA ='test'

其中TABLE_SCHEMA是数据库名。
查询结果示例如下:

table_name table_rows create_time update_time
goods 5 2020/1/15 17:10 2020/1/15 17:10

其中update_time是表结构的更新时间,而不是表数据的更新时间,而且我测试下来create_time也会跟着变化不知为何。

MySQL(version 5.7)读取表中的列信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT
COLUMN_NAME as 'column_name',
DATA_TYPE as 'data_type',
IFNULL(CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION) as 'data_length',
NUMERIC_SCALE as 'data_scale',
COLUMN_TYPE as 'column_type',
IF(COLUMN_KEY='PRI', 1, 0) as 'primary_key',
IF(EXTRA='auto_increment', 1, 0) as 'autoincrement',
IF(IS_NULLABLE='YES', 1, 0) as 'nullable',
COLUMN_DEFAULT as 'column_default',
COLUMN_COMMENT as 'column_comment'
FROM
information_schema.COLUMNS
WHERE TABLE_SCHEMA ='test' and TABLE_NAME = 'goods'

其中TABLE_SCHEMA是数据库名,TABLE_NAME是表名。
查询结果示例如下:

column_name data_type data_length data_scale column_type primary_key autoincrement nullable column_default column_comment
id int 10 0 int(11) 1 1 0
name varchar 100 varchar(100) 0 0 1 名称
price decimal 18 2 decimal(18,2) 0 0 0 0 价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# -*- coding: utf-8 -*-
from redis import Redis
import socket
import os
import threading
import time
import uuid
from contextlib import ExitStack, contextmanager

# 加锁时间片
# 对于超过一个加锁时间片的锁应该重复使用expire设置过期时间
# 为的是在程序异常结束时锁能被尽快自动释放,所以时间片不应太长,
# 也不应设置过短,不然一是会频繁的访问redis续过期时长导致资源浪费,
# 二是在很卡的时候负责续时的线程可能无法正常续时。
# 建议设置在5~20秒
LOCK_TIME_SLICE = 10


class KeepLockThread(threading.Thread):

def __init__(self, redis, key_name):
super().__init__()
self.redis = redis
self.key_name = key_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
self.redis.expire(self.key_name, LOCK_TIME_SLICE)
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"
self.redis = Redis(host=server)
self.thread = None

# blocking: 是否阻塞
# shortlived: 是否短暂的加锁,如果是短暂的则不会创建额外的线程维护过期时间,所谓短暂就是确定加锁时间小于LOCK_TIME_SLICE
# 加锁时间是从获得锁到释放锁的时间,不包括阻塞等待的时间,可以阻塞等待很长时间仍然是短暂的加锁。
def acquire(self, blocking=True, shortlived=False):
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
time.sleep(0.05)
r = self.redis.set(self.key_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
# 如果加锁成功了,那么应该创建线程维护过期时间
self.thread = KeepLockThread(self.redis, self.key_name)
self.thread.daemon = True
self.thread.start()
return r

def release(self):
if self.acquired():
if self.thread:
self.thread.stop()
self.thread = None
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
if r is not None and r.decode() == str(self.id):
return True
else:
return False

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value is not None:
raise exc_value


def mutex(names, server="127.0.0.1"):
def medium(func):
def wrapper(*args, **kw):
if type(names) == list:
with ExitStack() as stack:
for name in names:
stack.enter_context(Mutex(name))
return func(*args, **kw)
else:
with Mutex(names, server):
return func(*args, **kw)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
return medium


class KeepReadLockThread(threading.Thread):

def __init__(self, redis, lock_id, rlock_name):
super().__init__()
self.redis = redis
self.id = lock_id
self.rlock_name = rlock_name

self.terminated = False
self.lock = threading.Lock()

def run(self):
while True:
time.sleep(LOCK_TIME_SLICE//3+1)

with self.lock:
if not self.terminated:
pipeline = self.redis.pipeline()
now = time.time()
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
else:
return

def stop(self):
with self.lock:
self.terminated = True


# 读写锁
class ReadWriteLock:

def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.meta_lock_name = "META_" + name

hostname = socket.gethostname()
pid = os.getpid()
tid = threading.get_ident()
# 得到唯一ID,主机名 PID TID 是为了调试用
self.id = f"{hostname}_{pid}_{tid}_{uuid.uuid4().hex}"

self.redis = Redis(host=server)
self.lock_type = None
self.thread = None

def acquire_read_lock(self, blocking=True, shortlived=False):
# 这个mutex用来维护读写锁内部数据本身使用,在离开ReadWriteLock函数前必须释放
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-LOCK_TIME_SLICE)*1000) )
# 添加新的读锁
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, LOCK_TIME_SLICE)
pipeline.execute()
self.lock_type = 'R'

if not shortlived:
# 创建线程维护读锁过期时间
self.thread = KeepReadLockThread(self.redis, self.id, self.rlock_name)
self.thread.daemon = True
self.thread.start()

return True
finally:
mutex.release()

def acquire_write_lock(self, blocking=True, shortlived=False):
mutex = Mutex(self.meta_lock_name, self.server)
try:
# 注意这里使用mutex时shortlived始终为True,它不是由外部使用读写锁是否短暂决定
# 而是加读写锁时内部需要用到mutex进行短暂加锁,以保证获取到读锁为0后到加写锁前读锁不会新增
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
else:
return False
# 尝试获取写锁
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
# 没有获取到写锁,那么可能读锁又会被其他地方获取,所以要重新先等到读锁为0
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire(shortlived=True)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-LOCK_TIME_SLICE)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=LOCK_TIME_SLICE, nx=True)
if r:
if not shortlived:
self.thread = KeepLockThread(self.redis, self.wlock_name)
self.thread.daemon = True
self.thread.start()

self.lock_type = 'W'
return r
finally:
mutex.release()

def acquired(self, lock_type = "any"):
if lock_type == "any":
if self.lock_type is not None:
return True
else:
return False
else:
if self.lock_type == lock_type:
return True
else:
return False

def release(self):
if self.thread:
self.thread.stop()
self.thread = None
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r is not None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None


@contextmanager
def read_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_read_lock()
yield rwlock
finally:
rwlock.release()


@contextmanager
def write_lock(name, server="127.0.0.1"):
rwlock = ReadWriteLock(name=name)
try:
rwlock.acquire_write_lock()
yield rwlock
finally:
rwlock.release()

Mutex支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是利用redis设置过期时间,超时后自动删除,一次长时间的加锁会通过一个线程不断的续时,如果加锁后崩溃,锁会在一个时间片内的时间被自动释放。

又基于Mutex实现了一个ReadWriteLock,也支持阻塞和非阻塞,以及崩溃后自动释放。