python-2.7 - 无法使用 trollius 在 python 2.7 的异步线程中处理 kafka-consumer 消息
问题描述
我是卡夫卡的新手,所以如果我错过了什么,请告诉我?用例如下,运行了三个Kafka消费者,从中读取的消息通过post调用客户端API进行处理和发送,但是客户端花费的时间(测试我用了8分钟)不是已修复,因此它也可以大于 max.poll.timeout(对于这种情况,我使用了 6 分钟)。为此,我试图在客户端发回响应时暂停()消费者,然后调用 resume()以使用 poll()从 kafka-broker 获取下一条消息。我尝试使用下面的代码在异步线程中进行处理,但这并没有异步执行,并且正在发生重新平衡,然后所有消费者都离开了该组。我该如何处理这个用例,为什么代码没有异步执行?
版本:
- 蟒蛇2.7
- 卡夫卡蟒1.4.6
- trollius 2.2.post1
import trollius as asyncio
from trollius import From
for message in consumer:
consumer.pause(*consumer.assignment())
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(thread_message_processor(consumer)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
.....
@asyncio.coroutine
def thread_message_processor(consumer):
# for testing I am using sleep as time taken by client API for processing
yield From(asyncio.sleep(480))
consumer.resume(*consumer.assignment())
解决方案
推荐阅读
- katalon-studio - 在一个测试对象中输入数据会在katalon studio中的另一个测试对象上输入值
- ios - 如何做 UITextView 滚动检测和更改图像
- python - scipy.optimize.minimize results differ between Python 2.x-3.x
- python - 有没有办法使用 Matplotlib 中的时间序列数据生成类方波?
- html - How to avoid different html element with same ID/Name while using dynamic name generation
- dart - Identity function in Dart?
- nfc - 如何在 gototags 或任何其他适用于 NFC 的 windows 软件中对考勤系统进行编程或编码?
- vb6 - VB6 和 windows 10 屏幕闪烁
- vhdl - 使用 MIF 文件的 VHDL 预加载 RAM 存储器
- firebase - 将值存储在 Firebase 中并在 React Native 中导航