kafka-util

对kafka-python进行封装,增加了去重,因为默认情况下Kafka只保证不丢失数据,不保证极端情况不重复,这里增加去重处理后可保证不重不漏,还实现了从指定时间还是订阅的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

import kafka # pip install kafka-python
import time
import uuid
import datetime
import queue
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class KafkaProducer(kafka.KafkaProducer):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
# 生成一个长32位的id用于去重,同时顺便记录消息生成时间
# 默认情况下Kafka保证消息不丢失,但并不能保证不重复,所以我们通过id来自行去重
id = "{}{}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3], uuid.uuid4().hex[:15])
super().send(topic, key=key.encode(), value=id.encode()+value)


def create_kafka_producer(server):
retry_times = 3
for i in range(1, retry_times+1):
try:
producer = KafkaProducer(
bootstrap_servers=[server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return producer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


class KafkaConsumer(kafka.KafkaConsumer):
def __init__(self, **kw):
super().__init__(**kw)
self.q = queue.Queue()
self.s = set()
self.DUP_DETECT_SIZE = 1000 # 去重检测大小

# 从最新消息开始订阅(subscribe在父类有实现)
# def subscribe(self, topic):
# pass

# 实现从一个历史时间点进行消息订阅(能订阅到的消息取决于Kafka服务器配置的保留策略,基于目前的配置可以保证72小时内的消息可以重复消费)
def subscribe_from_datetime(self, partition, topic, dt):
if type(dt) is int or type(dt) is float:
ts = dt
elif isinstance(dt, datetime.datetime):
ts = dt.timestamp()
else:
ts = time.mktime(time.strptime(f"{dt}", r"%Y-%m-%d %H:%M:%S"))
offset = self._get_offset_for_time(partition, topic, ts)
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
super().seek(tp, offset)

def __iter__(self):
return self

# 重新封装阻塞取消息方式,增加去重
def __next__(self):
while True:
message = super().__next__()
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(id)
self.q.put(id)

return Message(key=key.decode(), value=message.value[32:])

# 重新封装非阻塞取消息方式,增加去重
def get_messages(self, max_records=20):
r = super().poll(timeout_ms=max_records*25, max_records=max_records)
ret = []
for messages in r.values():
for message in messages:
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(uuid)
self.q.put(uuid)

ret.append(Message(key=key.decode(), value=message.value[32:]))
return ret

def _get_latest_offset(self, partition, topic):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
off_set_dict = super(KafkaConsumer, self).end_offsets([tp])
return list(off_set_dict.values())[0]

def _get_offset_for_time(self, partition, topic, ts):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
offset_dict = super(KafkaConsumer, self).offsets_for_times({tp: int(ts*1000)})
offset = list(offset_dict.values())[0]
if offset is None:
return self._get_latest_offset(partition, topic)
else:
return offset.offset


def create_kafka_consumer(server, group_id):
retry_times = 3
for i in range(1, retry_times+1):
try:
consumer = KafkaConsumer(
bootstrap_servers=[server],
group_id=group_id,
auto_offset_reset='latest', # earliest, latest
enable_auto_commit=True,
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return consumer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)