python - 使用 Kafka 在时间窗口内处理消息
问题描述
这段代码:
from confluent_kafka import Consumer, KafkaError
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'client.id': 'client-1',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
except KeyboardInterrupt:
pass
finally:
c.close()
取自https://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers
我正在尝试更新此代码,以便每秒轮询一次主题,但会处理 1 分钟窗口内所有消息的统计信息。
这就是我计划解决的方法:
替换msg = c.poll(0.1)
为msg = c.poll(1)
引入一个新变量i
,它将保持给定分钟的当前消息数。
创建一个新类 SharedQueue 来存储要处理的数据:
class SharedQueue:
data_queue = deque(maxlen=1000000)
def append_data_queue(self, msg):
self.data_queue.append(msg)
def get_data_queue(self, record_key, record_value, timestamp):
return self.append_data_queue
通过更改,代码变为:
from confluent_kafka import Consumer, KafkaError
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'client.id': 'client-1',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
sq = SharedQueue()
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
i = 0
while True:
i = i + 1
msg = c.poll(1)
sq.append_data_queue(msg)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
if i == 60:
//process the last 60 items of the queue.
i = 0
except KeyboardInterrupt:
pass
但这不是一个好的解决方案,因为poll
如果有可用记录,可以立即返回。
如何实现对时间窗口内收到的消息的处理?我是否在正确的轨道上实施队列?
解决方案
推荐阅读
- python - 如何将 NSISO8601DateFormatOptions 与 PyObjC 一起使用
- excel - 当没有数据要计算而不使用自定义数字格式时如何隐藏 0
- qt - QML 中的条件拖放
- linux - 如何通过两个 SSH 连接使用 TensorBoard
- amazon-web-services - 将日志文件从 Container 同步到 S3 存储桶时出现问题
- javascript - 似乎无法在 blazor 中调用 javascript
- r - 按距离为 R 中的条件逻辑回归创建案例对照匹配
- omnet++ - 静脉 5.1 和 omnet 6.0pre10 中的 opp_run 错误
- canvas - 我想在 C++Builder 中的 TPanel 上绘图
- java - Java PathMatcher:为什么 ** 对零文件夹不起作用?