首页 > 解决方案 > 如何从特定日期接收来自 kafka 的数据

问题描述

我想订阅主题并接收特定日期的数据。方法调用抛出异常:

Confluent.Kafka.KafkaException:本地:错误状态

我的代码:

adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
    .Topics
    .First(x => x.Topic == _config.Topic)
    .Partitions;
var partitionsOffsets = partitions
    .Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));

consumer = CreateConsumer();

foreach (var p in partitions)
{
    consumer.Assign(new TopicPartition(_config.Topic, p.PartitionId));
}

var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));

//await Task.Delay(1000);

foreach (var o in offsets)
{
    consumer.Seek(o);
}

如果我添加等待:await Task.Delay(1000);。方法Seek()不会抛出异常。我如何按日期设置偏移量,没有Task.Delay

标签: c#apache-kafkakafka-consumer-api

解决方案


下一个代码工作正确!

var adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
    .Topics
    .First(x => x.Topic == _config.Topic)
    .Partitions;
var partitionsOffsets = partitions
    .Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));

consumer = CreateConsumer();

var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));

foreach (var o in offsets)
{
    consumer.Assign(o); 
}


推荐阅读