c# - 使用 .NET `Confluent.Kafka` `subscribe`/`assign` 到 Kafka 主题的正确方法是什么?
问题描述
Task.Run(
() =>
{
try
{
var (topic, partitionOrNull, offsetOrNull) = target;
if (partitionOrNull == null && offsetOrNull == null) consumer.Subscribe(topic);
else
{
var partition = partitionOrNull ?? 0;
if (offsetOrNull != null) consumer.Assign(new TopicPartitionOffset(topic, partition, offsetOrNull.Value));
else consumer.Assign(new TopicPartition(topic, partition));
}
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF) continue;
observer.OnNext((consumeResult.Offset.Value, consumeResult.Key, consumeResult.Value));
}
}
catch (Exception exception)
{
observer.OnError(exception);
}
},
cancellationToken);
return Task.FromResult<IDisposable>(consumer);
});
因此,有一个 Kafka 消费者(只是 latest 的一个小包装器Confluent.Kafka
)涵盖了两种情况:subscribe
使用 dynaimc 重新平衡和assign
ing 到指定的分区和/或偏移量。问题是当我只指定topic
( partition == null && offset == null
) 时,消费者无限挂起而没有任何进展。我是在滥用框架还是发生了什么?
PS 准确地指定所有三个参数效果很好。
解决方案
推荐阅读
- or-tools - 如何使用 or-tools 在 bool 数组中制作 K 个不同的元素?
- microservices - 微服务聚合器服务 BFF
- python - 使用索引号同时更改 pandas 数据框中的多个列名(不是所有列名)
- android - 使用 Mircosoft 图形 REST API 进行自动发现
- html - href vs 链接反应?JS 不是基于 HTML 标签加载的
- php - 从一个表中获取数组数据并插入到另一个表中
- python - 如何从嵌套元组列表中生成批量数据?
- javascript - Javascript用0替换最后一个数字而不是用指数符号表示数字
- sql - 基于某些内容的总和表
- java - 通过 jni 从 Java 调用时,C++ iostreams 不起作用