1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| import redis from dataclasses import dataclass
@dataclass class Message: key: str = "" value: bytes = b""
class RedisMQProducer: def __init__(self, *args, **kw): self.client = redis.Redis(*args, **kw)
def send(self, topic: str, key: str, value: bytes): redis_key = f"MQ_{topic}" if len(key) >= 32: raise Exception("key超长") message = key.encode() + (b'\0' * (32 - len(key))) + value self.client.lpush(redis_key, message)
def create_redis_mq_producer(server): if server.find(':') != -1: host, port = server.split(":", 1) else: host = server port = 6379 return RedisMQProducer(host=host, port=port, db=0)
class RedisMQConsumer: def __init__(self, *args, **kw): self.client = redis.Redis(*args, **kw) self.topics = []
def subscribe(self, topics): self.topics = topics
def __iter__(self): self.idx = 0 return self
def __next__(self): if not self.topics: raise StopIteration while True: topic = self.topics[self.idx] self.idx = (self.idx + 1) % len(self.topics) redis_key = f"MQ_{topic}" data = self.client.rpop(redis_key) if data: return Message(key=data[:32].decode().strip('\0'), value=data[32:])
def get_messages(self, max_records=20): ret = [] topics = self.topics while topics: topics_bak = topics topics = [] for topic in topics_bak: redis_key = f"MQ_{topic}" data = self.client.rpop(redis_key) if data: try: ret.append(Message(key=data[:32].decode().strip('\0'), value=data[32:])) except Exception as e: pass topics.append(topic) if len(ret) >= max_records: return ret return ret
def create_redis_mq_consumer(server): if server.find(':') != -1: host, port = server.split(":", 1) else: host = server port = 6379 return RedisMQConsumer(host=host, port=port, db=0)
|