首页 > 解决方案 > 无法使用 trollius 在 python 2.7 的异步线程中处理 kafka-consumer 消息

问题描述

我是卡夫卡的新手,所以如果我错过了什么,请告诉我?用例如下,运行了三个Kafka消费者,从中读取的消息通过post调用客户端API进行处理和发送,但是客户端花费的时间(测试我用了8分钟)不是已修复,因此它也可以大于 max.poll.timeout(对于这种情况,我使用了 6 分钟)。为此,我试图在客户端发回响应时暂停()消费者,然后调用 resume()以使用 poll()从 kafka-broker 获取下一条消息。我尝试使用下面的代码在异步线程中进行处理,但这并没有异步执行,并且正在发生重新平衡,然后所有消费者都离开了该组。我该如何处理这个用例,为什么代码没有异步执行?

版本:

import trollius as asyncio
from trollius import From

for message in consumer:
    consumer.pause(*consumer.assignment())
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(thread_message_processor(consumer)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    .....


@asyncio.coroutine
def thread_message_processor(consumer):
    # for testing I am using sleep as time taken by client API for processing
    yield From(asyncio.sleep(480))
    consumer.resume(*consumer.assignment())

标签: python-2.7apache-kafkapython-asynciokafka-consumer-apikafka-python

解决方案


推荐阅读