python - 卡夫卡消费者间歇性挂起
问题描述
我有一个将多个 Kafka 主题加载到数据库中的过程。
它可以工作并加载数据,但消费者会间歇性地挂起 7-12 小时,然后返回消息。
它只发生在我列表中的 11 个主题中的 1 个或 2 个。
由于主题具有不同的特征(有些是 json,有些是 avro,有些是加密的)我为每个主题使用了一个消费者。
在下面的缩短代码中;
- 我从 topic_list json 文件中阅读了主题
- 为每个消费者/主题创建侦听器对象
- 监听主题并处理要加载的数据
from kafka import KafkaConsumer
class KafkaListener:
def init(self,topic):
self.topic_name = str(topic.get("topic_name")). # coming from topics_list json file
self.offset = topic.get("offset", "earliest") # default earliest, would like to be able to control per topic
self.timeout = topic.get("timeout", 24000) # default to 24,000 would like to be able to control per topic
self.consumer = KafkaConsumer(
group_id="group1",
bootstrap_servers=bootstrap_server_address,
auto_offset_reset=self.offset,
value_deserializer=deserializer,
consumer_timeout_ms=self.timeout,
)
self.consumer.subscribe([self.topic_name])
def load(self):
log(f"Started listening topic: {self.topic_name}")
for message in self.consumer:
< process the message >
log(f"Load completed for topic: {self.topic_name}")
# read topics from file
with topics_list_file.open("r") as f_topics:
topics_read = json.load(f_topics)
# build listeners for each topic
for topic in topics:
listeners[topic] = KafkaListener(topics[topic])
# listen the topics, load data
while True:
for listener in listeners:
listener.load()
主题列表文件
[
{
"topic_name": "json_topic1",
"message_type": "json"
},
{
"topic_name": "avro_topic1",
"message_type": "avro",
"avro_schema": "avro_topic1_schema"
},
{
"topic_name": "avro.topic2",
"message_type": "avro",
"avro_schema": "avro_topic2_schema"
},
{
"topic_name": "json.topic2",
"message_type": "json",
"encrypted": "True"
}
]
最新日志文件的摘录
...
2020-04-23 13:03:46 Started listening topic: json_topic1
2020-04-23 20:10:09 Load completed for topic: json_topic1
...
在此示例中,侦听器在返回数据之前等待了大约 7 个小时。
我怎样才能防止这种延迟?
我应该对这个用例使用其他方法吗?
更新:添加了设置topic names
和默认值的代码offset
和timeout
解决方案
推荐阅读
- java - 为什么我不能通过 Jackson (codehouse) 调用自定义 JSON 反序列化方法?
- c# - 如何匹配两个字符串,一个包含“and”,另一个包含“&”或“+”?
- sql - Sqlserver SQL 加入日期(无有效字段)
- android - Flutter 告警应用权限
- vba - VBA-将日期从excel复制到csv时,日期和月份数字交换位置
- python-3.x - python中递归函数的解释
- configuration - 需要帮助来配置 CDN
- mysql - 为什么我使用 MYSQL 的关键字会出现这样的错误
- r - R中数据框列表中的变量类
- javascript - 检查两个圆之间的距离