基于Redis的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
from redis import Redis
import threading
import time
import uuid

# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)

def acquire(self, blocking=True, ex=120):
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
time.sleep(0.01)
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
return r

def release(self):
if self.acquired():
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
return r != None and r.decode() == str(self.id)

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value != None:
raise exc_value


# 读写锁
class ReadWriteLock:
MAX_READ_TIME = 120 # 最长加读锁时间(秒)
'''
正常不会用到MAX_READ_TIME,在加锁后异常并且没有对解锁进行正确处理时,会导致锁始终被一个已崩溃的进程持有无法释放,
为了应对这种情况,通过设置MAX_READ_TIME,在加读锁时间超过MAX_READ_TIME没有解锁时自动解锁
'''

def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.meta_lock_name = "META_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)
self.lock_type = None

def acquire_read_lock(self, blocking=True):
# 这个mutex用来维护读写锁内部数据本身使用,在离开ReadWriteLock函数前必须释放
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-self.MAX_READ_TIME)*1000) )
# 添加新的读锁
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, self.MAX_READ_TIME)
pipeline.execute()
self.lock_type = 'R'
return True
finally:
mutex.release()

def acquire_write_lock(self, blocking=True, ex=120):
mutex = Mutex(self.meta_lock_name, self.server)
try:
mutex.acquire()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
# 移除一定时间前失效的读锁(主动处理异常未释放的读锁)
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
# 获取当前读锁数量
r = self.redis.zcard(self.rlock_name)
else:
return False
# 尝试获取写锁
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if r:
self.lock_type = 'W'
return r
finally:
mutex.release()

def acquired(self, lock_type = "any"):
return self.lock_type is not None if lock_type == "any" else self.lock_type == lock_type

def release(self):
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r != None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None

支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是超时后自动结束。
考虑了未崩溃但是超时(这种情况首先应该调整超时时间设置,或者程序应调整锁住的代码,例如多次分段锁)。
在这种异常情况发生时,可能产生一边释放了锁但还在访问,另一边加上了锁,记住这是异常情况,但我们要保证即使它发生了也尽量能正常工作下去,
对于确实存在访问冲突的那么是没办法的,该异常就异常好了,还有种情况是虽然加了锁,但是并没有访问冲突,其实程序可以正常下去,但是这里会发生什么呢?

1
2
3
4
对于A线程,手动加锁----------超时自动解锁|------------------------手动解锁|
对于B线程,-------------手动加锁-------|(等到此加锁成功)---------------------手动解锁|
对于C线程,-------------------------------|------------手动加锁-------|(如果在此加锁成功是错误的!)

A线程因为超时自动解锁后虽然和B线程没有发生访问冲突,但是它解了B线程的锁,导致C线程加锁成功,而B线程实际还没解锁,这又制造了潜在的B线程和C线程的访问冲突。
所以手动解锁时应该判断下当前的锁是否是自己加的。这就是acquired函数中r.decode() == str(self.id)存在的意义。

我们基于Mutex实现了一个ReadWriteLock,可以看到在Mutex加锁时最长加锁时间ex是在加锁时传入的,而ReadWriteLock的最长加读锁时间则是其类成员变量,而最长加写锁时间是每次传入的,这是因为对于Mutex来说每次加锁的时间可以是不同的,而ReadWriteLock加读锁需要依赖

1
redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) )

来释放因异常而过期的锁,这时的过期时间相当于是对所有之前加过的读锁而言的,所以比较简单的方式就是统一加读锁的最长时间。