首页 > 解决方案 > 使用 Kafka 在时间窗口内处理消息

问题描述

这段代码:

from confluent_kafka import Consumer, KafkaError

settings = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'client.id': 'client-1',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
}

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()

取自https://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers

我正在尝试更新此代码,以便每秒轮询一次主题,但会处理 1 分钟窗口内所有消息的统计信息。

这就是我计划解决的方法:

替换msg = c.poll(0.1)msg = c.poll(1)

引入一个新变量i,它将保持给定分钟的当前消息数。

创建一个新类 SharedQueue 来存储要处理的数据:

class SharedQueue:

    data_queue = deque(maxlen=1000000)

    def append_data_queue(self, msg):
        self.data_queue.append(msg)

    def get_data_queue(self, record_key, record_value, timestamp):
        return self.append_data_queue

通过更改,代码变为:

from confluent_kafka import Consumer, KafkaError

settings = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'client.id': 'client-1',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
}

sq = SharedQueue()

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    i = 0
    while True:
        i = i + 1
        msg = c.poll(1)
        sq.append_data_queue(msg)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))
        if i == 60: 
            //process the last 60 items of the queue.
            i = 0


except KeyboardInterrupt:
    pass

但这不是一个好的解决方案,因为poll如果有可用记录,可以立即返回。

如何实现对时间窗口内收到的消息的处理?我是否在正确的轨道上实施队列?

标签: pythonapache-kafka

解决方案


推荐阅读