.net - Kafka 消费者读取整个分区后会发生什么?
问题描述
我确实有一个在 Windows 服务中使用的 .Net 代码,该服务具有初始化消费者对象和使用 Kafka 的过程。
var config = new ProducerConfig
{
BootstrapServers = "host1:9092,host2:9092,...",
ClientId = ...,
...
};
ConsumerBuilder<byte[], byte[]> c = new ConsumerBuilder<byte[], byte[]>(config ).SetErrorHandler(_Consumer_OnError);
consumer = c.Build();
consumer.Subscribe(topics);
while (!canceled)
{
var consumeResult = consumer.Consume(cancellationToken);
//handle consumed message.
...
}
我想知道从分区中消耗完所有消息后会发生什么。仅供参考:有 3 个分区和 4 个节点用于消费信息。
解决方案
从confluent-kafka-dotnet,ConsumeResult<T1, T2> Consume(CancellationToken ct)
方法将:
轮询新消息/事件。阻塞直到消费结果可用或操作被取消。
这意味着调用Consume()
将永远不会真正停止尝试使用分区中的消息(除非发生崩溃或您取消操作)。因此,如果您在分区中有两个事件,则第三次调用Consume()
将阻塞,直到将新消息写入分区。
在该Consume
方法的内部,Kafka 客户端不断调用代理以检查是否有新事件写入分区。
推荐阅读
- .net - 如何通过 web.config 文件重定向到我的本地
- python - 未获得测试数据的正确 R2 分数
- react-native - React Native 如何将数据从函数组件传递到类组件
- json - Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中
- c - 函数声明中 '(const struct namect*)' 背后的含义是什么?
- java - 如何以多线程方式从 Java 代码运行 JMTER 脚本 (JMX)
- angular - Angular 11 选择了 mat-select 的默认值,填充了 observable
- reactjs - useEffect() 和 Google Firebase onAuthStateChange
- python - 使用 cursor.execute 到 pandas 的 SQL 行
- r - 陷入错误:软件包“XXXXX”的编译失败