c# - 如何从特定日期接收来自 kafka 的数据
问题描述
我想订阅主题并接收特定日期的数据。方法调用抛出异常:
Confluent.Kafka.KafkaException:本地:错误状态
我的代码:
adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
foreach (var p in partitions)
{
consumer.Assign(new TopicPartition(_config.Topic, p.PartitionId));
}
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
//await Task.Delay(1000);
foreach (var o in offsets)
{
consumer.Seek(o);
}
如果我添加等待:await Task.Delay(1000);
。方法Seek()
不会抛出异常。我如何按日期设置偏移量,没有Task.Delay
?
解决方案
下一个代码工作正确!
var adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
foreach (var o in offsets)
{
consumer.Assign(o);
}
推荐阅读
- visual-studio - assemblyBinding 理解 Newtonsoft.Json 和 Common.Logging 连接
- python - dask 分布式客户端中的 ImportError
- jquery-jtable - jquery jtable deleteConfirmation功能不起作用
- android - 获得最多 YouTube 视频信息(如缓冲时间、视频质量等)的最佳方式是什么?
- javascript - 带有 intro.js 透明的 Bootstrap 4 模态
- python - 如何根据单击的按钮选择行编辑
- python - 获取 Tweepy 的响应并发送到 Django 模板
- vba - Vba/Macro 粘贴到下一个可用行
- javascript - 从嵌入式 Plunker/StackBlitz 等代码编辑器发送和获取数据
- owl-carousel-2 - owl-carousel 在完成加载之前垂直显示项目