1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| from redis import Redis import threading import time import uuid
class Mutex: def __init__(self, name, server="127.0.0.1"): self.name = name self.key_name = "MUTEX_" + name self.id = uuid.uuid4().hex self.redis = Redis(host=server)
def acquire(self, blocking=True, ex=120): r = self.redis.set(self.key_name, self.id, ex=ex, nx=True) if blocking: while not r: time.sleep(0.01) r = self.redis.set(self.key_name, self.id, ex=ex, nx=True) return r
def release(self): if self.acquired(): self.redis.delete(self.key_name)
def acquired(self): r = self.redis.get(self.key_name) return r != None and r.decode() == str(self.id)
def __enter__(self): self.acquire()
def __exit__(self, exc_type, exc_value, exc_trackback): self.release() if exc_value != None: raise exc_value
class ReadWriteLock: def __init__(self, name, server="127.0.0.1"): self.name = name self.server = server self.rlock_name = "RLOCK_" + name self.wlock_name = "WLOCK_" + name self.id = uuid.uuid4().hex self.redis = Redis(host=server) self.lock_type = None
def read_lock(self, blocking=True, ex=120): mutex = Mutex(self.name, self.server) try: mutex.acquire() wlock_locked = self.redis.get(self.wlock_name) if wlock_locked: if blocking: while wlock_locked: mutex.release() time.sleep(0.05) mutex.acquire() wlock_locked = self.redis.get(self.wlock_name) else: return False pipeline = self.redis.pipeline() now = time.time() pipeline.zremrangebyscore(self.rlock_name, 0, int((now-ex)*1000) ) pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} ) pipeline.expire(self.rlock_name, ex) pipeline.execute() self.lock_type = 'R' return True except Exception as e: raise finally: mutex.release() def write_lock(self, blocking=True, ex=120): mutex = Mutex(self.name, self.server) try: mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) ) r = self.redis.zcard(self.rlock_name) if r: if blocking: while r: mutex.release() time.sleep(0.05) mutex.acquire() self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) ) r = self.redis.zcard(self.rlock_name) else: return False r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True) if blocking: while not r: mutex.release() time.sleep(0.05) mutex.acquire() r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True) if r: self.lock_type = 'W' return r except Exception as e: raise finally: mutex.release()
def unlock(self): if self.lock_type == 'R': self.redis.zrem(self.rlock_name, self.id) self.lock_type = None elif self.lock_type == 'W': r = self.redis.get(self.wlock_name) if r != None and r.decode() == str(self.id): self.redis.delete(self.wlock_name) self.lock_type = None
|