首页 > 解决方案 > 是否有用于事件驱动的 Kafka 消费者的 Python API?

问题描述

我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发,并通过将消息推送回 Kafka 流来响应。

我一直在寻找类似 Spring 实现的东西:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

我看过:

  1. 卡夫卡蟒蛇
  2. 皮卡夫卡
  3. 融合卡夫卡

但我在 Python 中找不到与事件驱动的实现风格相关的任何内容。

标签: pythoneventsflaskapache-kafkalistener

解决方案


这是@MickaelMaison's answer给出的想法的实现。我使用了 kafka-python

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

轮询在不同的线程中完成。收到消息后,通过传递从 Kafka 检索到的数据来调用侦听器。


推荐阅读