基于Redis的封装消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

import redis
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class RedisMQProducer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
redis_key = f"MQ_{topic}"
if len(key) >= 32:
raise Exception("key超长")
message = key.encode() + (b'\0' * (32 - len(key))) + value
self.client.lpush(redis_key, message)


def create_redis_mq_producer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQProducer(host=host, port=port, db=0)


class RedisMQConsumer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)
self.topics = []


def subscribe(self, topics):
self.topics = topics

def __iter__(self):
self.idx = 0
return self

# 阻塞地获取下一个新消息
def __next__(self):
if not self.topics:
raise StopIteration
while True:
topic = self.topics[self.idx]
self.idx = (self.idx + 1) % len(self.topics)
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
return Message(key=data[:32].decode().strip('\0'), value=data[32:])

# 获取新消息并返回,直到所有订阅的topic中都没有新消息或者获取到了数量max_records个消息
def get_messages(self, max_records=20):
ret = []
topics = self.topics
while topics:
topics_bak = topics
topics = []
for topic in topics_bak:
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
try:
ret.append(Message(key=data[:32].decode().strip('\0'), value=data[32:]))
except Exception as e:
pass
topics.append(topic)
if len(ret) >= max_records:
return ret
return ret


def create_redis_mq_consumer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQConsumer(host=host, port=port, db=0)