1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| from redis import Redis import threading import time import uuid
class Mutex: def __init__(self, name, server="127.0.0.1"): self.name = name self.key_name = "MUTEX_" + name self.id = uuid.uuid4().hex self.redis = Redis(host=server)
def acquire(self, blocking=True, ex=120): r = self.redis.set(self.key_name, self.id, ex=ex, nx=True) if blocking: while not r: time.sleep(0.01) r = self.redis.set(self.key_name, self.id, ex=ex, nx=True) return r
def release(self): if self.acquired(): self.redis.delete(self.key_name)
def acquired(self): r = self.redis.get(self.key_name) return r != None and r.decode() == str(self.id)
def __enter__(self): self.acquire()
def __exit__(self, exc_type, exc_value, exc_trackback): self.release() if exc_value != None: raise exc_value
class ReadWriteLock: MAX_READ_TIME = 120 ''' 正常不会用到MAX_READ_TIME,在加锁后异常并且没有对解锁进行正确处理时,会导致锁始终被一个已崩溃的进程持有无法释放, 为了应对这种情况,通过设置MAX_READ_TIME,在加读锁时间超过MAX_READ_TIME没有解锁时自动解锁 '''
def __init__(self, name, server="127.0.0.1"): self.name = name self.server = server self.rlock_name = "RLOCK_" + name self.wlock_name = "WLOCK_" + name self.meta_lock_name = "META_" + name self.id = uuid.uuid4().hex self.redis = Redis(host=server) self.lock_type = None
def acquire_read_lock(self, blocking=True): mutex = Mutex(self.meta_lock_name, self.server) try: mutex.acquire() wlock_locked = self.redis.get(self.wlock_name) if wlock_locked: if blocking: while wlock_locked: mutex.release() time.sleep(0.05) mutex.acquire() wlock_locked = self.redis.get(self.wlock_name) else: return False pipeline = self.redis.pipeline() now = time.time() pipeline.zremrangebyscore(self.rlock_name, 0, int((now-self.MAX_READ_TIME)*1000) ) pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} ) pipeline.expire(self.rlock_name, self.MAX_READ_TIME) pipeline.execute() self.lock_type = 'R' return True finally: mutex.release() def acquire_write_lock(self, blocking=True, ex=120): mutex = Mutex(self.meta_lock_name, self.server) try: mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) ) r = self.redis.zcard(self.rlock_name) if r: if blocking: while r: mutex.release() time.sleep(0.05) mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) ) r = self.redis.zcard(self.rlock_name) else: return False r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True) if blocking: while not r: mutex.release() time.sleep(0.05) mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) ) r = self.redis.zcard(self.rlock_name) if r: if blocking: while r: mutex.release() time.sleep(0.05) mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-self.MAX_READ_TIME)*1000) ) r = self.redis.zcard(self.rlock_name) else: return False r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True) if r: self.lock_type = 'W' return r finally: mutex.release()
def acquired(self, lock_type = "any"): return self.lock_type is not None if lock_type == "any" else self.lock_type == lock_type
def release(self): if self.lock_type == 'R': self.redis.zrem(self.rlock_name, self.id) self.lock_type = None elif self.lock_type == 'W': r = self.redis.get(self.wlock_name) if r != None and r.decode() == str(self.id): self.redis.delete(self.wlock_name) self.lock_type = None
|