apache-kafka - 没有消息时从 Kafka 消费者返回
问题描述
我想使用Confluent dotnet client在应用程序启动中处理一个主题。假设以下示例:
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
当 Kafka 中没有新消息时,c.Consume 将被阻塞。因为我想用它来启动应用程序(比如缓存预热),所以当我发现没有新消息时,我想继续我的代码。
我知道设置超时存在过载,c.Consume(timeout)
但这种方法的问题是,如果您的主题中有一条消息,并且阅读消息的持续时间超过了您的超时,您会收到不希望的空输出。
解决方案
消费者不应该知道生产者。
现在,如果您想知道从开始消费的那一刻起您已经阅读了该主题中的所有内容,您可以:
- 在开始消费之前加载最新的偏移量。
- 然后开始消费消息。
- 如果消息的偏移量与您之前加载的最新偏移量相同,请停止消费。
我不是C#
开发人员,但从我在 dotnet confluent 文档中读到的内容,您可以呼吁QueryWatermarkOffsets
消费者获取最旧和最新的偏移量。https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_
然后,在Message
类上你有一个Offset
访问器。所以整个事情不应该太难实现。
https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset
推荐阅读
- neo4j - 无法解析的日期:带有 APOC.DATE.PARSE 的“NULL”
- enums - Android Room 类型转换多种枚举类型
- mapbox - MapQuest 方向 api 返回的点太少
- r - R 自动更改日期格式
- swift - 如何快速对表格单元格中的同一按钮执行不同的操作?
- api - Google Vision:如何选择所有类型的检测
- wordpress - 如何减慢 WordPress 网站的速度
- grails - Grails 3.3.6 没有为命令 gradle docs 触发 DocStart 和 DocEnd 事件
- android - 如何以编程方式关闭运行时权限对话框?
- c# - 使用 Postsharp 在运行时更改属性类变量