对kafka-python的封装

Kafka在默认配置下,保证消息至少被消费一次,即不漏,但不保证不重,下面代码实现了去重,以及从任意时间订阅历史消息。

utils/kafka-util.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# -*- coding: utf-8 -*-
import kafka
import time
import datetime
import uuid
from queue import Queue


class KafkaProducer(kafka.KafkaProducer):
def __init__(self, server):
super().__init__(
bootstrap_servers=[server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)

# topic可以认为是消息的信道,和subscribe时对应
# key并没有明确的意义,可以根据需要自定义,例如表示消息的类型在后面再根据key进行不同的处理,kafka底层保证相同key的消息在kafka集群时会在相同的分区上处理,从而保证相同key的消息的有序性
def send(self, topic, key, data):
assert(isinstance(data, bytes))
# 用时间+UUID拼成32位唯一值用于去重和在调试时查看发送时间
timestamp = "{}{}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3], uuid.uuid4().hex[:15])
timestamp = timestamp[:32]
value = timestamp.encode() + data
return super().send(topic, key=key.encode(), value=value)


class KafkaConsumer(kafka.KafkaConsumer):
def __init__(self, server, group_id=None):
super().__init__(
bootstrap_servers=[server],
group_id=group_id,
auto_offset_reset='latest', # earliest, latest
enable_auto_commit=True,
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
self.server = server
self.q = Queue()
self.s = set()
self.DUP_DETECT_SIZE = 1000 # 去重检测大小

# 从最新消息开始订阅
def subscribe(self, topic):
if topic not in super().topics():
# 创建一个临时的producer发送一个空消息的以便把topic创建出来,key是空的,consumer会丢弃掉
# 这并不是必要的,我觉得可能是一个BUG,先订阅一个不存在的topic,之后再send消息到这topic上,consumer可能收不到
# 后续版本可能已经修复了,不清楚是kafka-python的还是kafka的
producer = kafka.KafkaProducer(
bootstrap_servers=[self.server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
producer.send(topic, key=b"", value=b"")
producer.flush()
super().subscribe(topic)

# 实现从一个历史时间点进行消息订阅(能订阅到的消息取决于Kafka服务器配置的保留策略,基于目前的配置可以保证72小时内的消息可以重复消费)
def subscribe_from_datetime(self, topic, dt):
if topic not in super().topics():
# 创建一个临时的producer发送一个空消息的以便把topic创建出来,key是空的,consumer会丢弃掉
producer = kafka.KafkaProducer(
bootstrap_servers=[self.server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
producer.send(topic, key=b"", value=b"")
producer.flush()
if type(dt) is int or type(dt) is float:
ts = dt
elif isinstance(dt, datetime.datetime):
ts = dt.timestamp()
else:
ts = time.mktime(time.strptime(f"{dt}", r"%Y-%m-%d %H:%M:%S"))
offset = self._get_offset_for_time(topic, ts)
partition = 0
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
super().seek(tp, offset)

def __iter__(self):
return self

# 重新封装阻塞取消息方式,增加去重
# 迭代的元素类型是三元组(key, data, timestamp)
def __next__(self):
while True:
message = super().__next__()
msg_type = message.key
if not msg_type or len(message.value) < 32:
continue
timestamp = message.value[:32].decode()
if timestamp in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(timestamp)
self.q.put(timestamp)

data = message.value[32:]
return (msg_type.decode(), data, timestamp)

# 重新封装非阻塞取消息方式,增加去重
# 返回三元组(key, data, timestamp)构成的list
def get_messages(self, max_records=20):
r = super().poll(timeout_ms=max_records*25, max_records=max_records)
ret = []
for messages in r.values():
for message in messages:
msg_type = message.key
if not msg_type or len(message.value) < 32:
continue
timestamp = message.value[:32].decode()
if timestamp in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(timestamp)
self.q.put(timestamp)

data = message.value[32:]
ret.append((msg_type.decode(), data, timestamp))
return ret


def _get_latest_offset(self, topic):
partition = 0
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
off_set_dict = super().end_offsets([tp])
return list(off_set_dict.values())[0]

def _get_offset_for_time(self, topic, ts):
partition = 0
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
offset_dict = super().offsets_for_times({tp: int(ts*1000)})
offset = list(offset_dict.values())[0]
if offset is None:
return self.get_latest_offset(topic)
else:
return offset.offset

使用时可以配合json或msgpack之类的序列化方式,如果使用json,下面的jsonable函数可能会对你有用,它可以让日期、时间、自定义类型对象支持通过json序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
import datetime
from utils.kafka_util import KafkaProducer
import json
import decimal

def to_jsonable(o):
if o is None:
return o
if isinstance(o, int) or isinstance(o, float) or isinstance(o, str) or isinstance(o, bool):
return o
if isinstance(o, list) or isinstance(o, tuple) or isinstance(o, set):
return [to_jsonable(e) for e in o]
if isinstance(o, dict):
return {k: to_jsonable(v) for (k,v) in o.items()}
if isinstance(o, datetime.datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, datetime.date):
return o.strftime('%Y-%m-%d')
if isinstance(o, datetime.time):
return o.strftime('%H:%M:%S')
if isinstance(o, decimal.Decimal):
return float(o)
try:
return { k: to_jsonable(v) for k,v in vars(o).items()}
except Exception as e:
return o

producer = KafkaProducer(server="192.168.1.99")

class A:
def __init__(self):
self.x = 1
self.y = 3.14

class B:
def __init__(self):
self.x = A()
self.y = datetime.datetime.now()

producer.send("topic-test", "obj", json.dumps(to_jsonable(B())).encode() )
time.sleep(1)

如果在key中记录发送对象的类型,则接收时就可以想办法进行还原

1
2
3
4
5
6
7
8
9
10
11
12
13
from utils.kafka_util import KafkaConsumer
import time
import json


consumer = KafkaConsumer(server="192.168.1.99")
consumer.subscribe("topic-test")
while True:
messages = consumer.get_messages()
for (key, data, timestamp) in messages:
print(key, json.loads(data), timestamp)
time.sleep(0.1)