python - 是否有用于事件驱动的 Kafka 消费者的 Python API?
问题描述
我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发,并通过将消息推送回 Kafka 流来响应。
我一直在寻找类似 Spring 实现的东西:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
我看过:
但我在 Python 中找不到与事件驱动的实现风格相关的任何内容。
解决方案
这是@MickaelMaison's answer给出的想法的实现。我使用了 kafka-python。
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:\n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
轮询在不同的线程中完成。收到消息后,通过传递从 Kafka 检索到的数据来调用侦听器。
推荐阅读
- python - python请求中找不到404的解决方案吗?
- salesforce - 在我的单元测试期间,断言语句在测试类中失败
- php - axios.delete 的“尝试获取非对象的属性‘参数’”。后端:PHP 前端:React
- angular - 角度数据未定义
- scala - 如何处理在scala中解析json的异常
- vue.js - Vuejs,当我在商店模块中访问它时,为什么 this.$store.state.route.params.activityId 未定义
- html - 输入元素在聚焦时覆盖按钮
- python - 如何从 [c_api.h] 向 TF_SessionRun 提供训练数据
- java - 为什么 IntelliJ 将注释标头放在错误的位置……仅用于测试?
- javascript - 如何在 Highcharter 中使用 JavaScript?