首页 > 解决方案 > 卡夫卡消费者间歇性挂起

问题描述

我有一个将多个 Kafka 主题加载到数据库中的过程。

它可以工作并加载数据,但消费者会间歇性地挂起 7-12 小时,然后返回消息。

它只发生在我列表中的 11 个主题中的 1 个或 2 个。

由于主题具有不同的特征(有些是 json,有些是 avro,有些是加密的)我为每个主题使用了一个消费者。

在下面的缩短代码中;

  1. 我从 topic_list json 文件中阅读了主题
  2. 为每个消费者/主题创建侦听器对象
  3. 监听主题并处理要加载的数据
from kafka import KafkaConsumer

class KafkaListener:
    def init(self,topic):

        self.topic_name = str(topic.get("topic_name")). # coming from topics_list json file
        self.offset = topic.get("offset", "earliest") # default earliest, would like to be able to control per topic
        self.timeout = topic.get("timeout", 24000) # default to 24,000 would like to be able to control per topic

        self.consumer = KafkaConsumer(
            group_id="group1",
            bootstrap_servers=bootstrap_server_address,
            auto_offset_reset=self.offset,
            value_deserializer=deserializer,
            consumer_timeout_ms=self.timeout,
        )
        self.consumer.subscribe([self.topic_name])

    def load(self):
        log(f"Started listening topic: {self.topic_name}")

        for message in self.consumer:
            < process the message >

        log(f"Load completed for topic: {self.topic_name}")

# read topics from file
with topics_list_file.open("r") as f_topics:
    topics_read = json.load(f_topics)

# build listeners for each topic
for topic in topics:
    listeners[topic] = KafkaListener(topics[topic])

# listen the topics, load data
while True:
    for listener in listeners:
        listener.load()  

主题列表文件

[
{
    "topic_name": "json_topic1",
    "message_type": "json"
},
{
    "topic_name": "avro_topic1",
    "message_type": "avro",
    "avro_schema": "avro_topic1_schema"
},
{
    "topic_name": "avro.topic2",
    "message_type": "avro",
    "avro_schema": "avro_topic2_schema"
},
{
    "topic_name": "json.topic2",
    "message_type": "json",
    "encrypted": "True"
}
]

最新日志文件的摘录

...
2020-04-23 13:03:46 Started listening topic: json_topic1
2020-04-23 20:10:09 Load completed for topic: json_topic1
...   

在此示例中,侦听器在返回数据之前等待了大约 7 个小时。

我怎样才能防止这种延迟?

我应该对这个用例使用其他方法吗?

更新:添加了设置topic names和默认值的代码offsettimeout

标签: pythonkafka-consumer-api

解决方案


推荐阅读