c# - 重新使用给定时间的 Kafka 消息
问题描述
我正在使用 Confluent.Kafka .NET 客户端版本 1.3.0。我想从给定时间开始使用消息。
为此,我可以OffsetsForTimes
用来获取所需的偏移量和Commit
该分区的偏移量:
private void SetOffset()
{
const string Topic = "myTopic";
const string BootstrapServers = "server1, server2";
var adminClient = new AdminClientBuilder(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
var consumer = new ConsumerBuilder<byte[], byte[]>(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "group.id", this.groupId },
{ "enable.auto.commit", "false" },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
// Timestamp to which the offset should be set to
var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);
var newOffsets = new List<TopicPartitionOffset>();
var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
foreach (var topicMetadata in metadata.Topics)
{
if (topicMetadata.Topic == Topic)
{
foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
{
var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);
IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
TimeSpan.FromSeconds(5));
newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
}
}
}
consumer.Commit(newOffsets);
// Consume messages
consumer.Subscribe(Topic);
var consumerResult = consumer.Consume();
// process message
//consumer.Commit(consumerResult);
}
如果我想跳过消息并跳转到给定的偏移量,如果我想跳转到的偏移量在最后提交的消息之后,这很好用。
但是,如果给定的时间戳在最后提交的消息的时间戳之前,则上述方法将不起作用。在上面的代码中,如果在timeStamp
最后提交的消息的时间戳之前,那么OffsetsForTimes
将返回最后提交的消息的偏移量+1。即使我手动将偏移量设置为较低的偏移量,那么consumer.Commit(newOffsets)
似乎没有效果,我消费时收到第一条未提交的消息。
有没有办法从代码中实现这一点?
解决方案
如果您分配给每个分区并说明开始读取的偏移量,您就可以做到这一点。
这是您获取主题分区列表的方式:
public static List<TopicPartition> GetTopicPartitions(string bootstrapServers, string topicValue) {
var tp = new List<TopicPartition>();
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
meta.Topics.ForEach(topic => {
if (topic.Topic == topicValue) {
foreach (PartitionMetadata partition in topic.Partitions) {
tp.Add(new TopicPartition(topic.Topic, partition.PartitionId));
}
}
});
}
return tp;
}
这是您找到特定时间的偏移量的方法:
List<TopicPartition> topic_partitions = frmMain.GetTopicPartitions(mBootstrapServers, txtTopic.Text);
using (var consumer = new ConsumerBuilder<Ignore, string>(cfg).Build()) {
consumer.Assign(topic_partitions);
List<TopicPartitionTimestamp> new_times = new List<TopicPartitionTimestamp>();
foreach (TopicPartition tp in topic_partitions) {
new_times.Add(new TopicPartitionTimestamp(tp, new Timestamp(dtpNewTime.Value)));
}
List<TopicPartitionOffset> seeked_offsets = consumer.OffsetsForTimes(new_times, TimeSpan.FromSeconds(40));
string s = "";
foreach (TopicPartitionOffset tpo in seeked_offsets) {
s += $"{tpo.TopicPartition}: {tpo.Offset.Value}\n";
}
Console.WriteLine(s);
consumer.Close();
}
这是您通过分配给所有主题分区和特定偏移量来使用的方式:
using (var consumer =
new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => Log($"Error: {e.Reason}"))
.Build()) {
consumer.Assign(seeked_offsets);
try {
while (true) {
try {
var r = consumer.Consume(cancellationToken);
// do something with r
} catch (ConsumeException e) {
//Log($"Consume error: {e.Error.Reason}");
}
}
} catch (OperationCanceledException) {
//Log("Closing consumer.");
consumer.Close();
}
}
如果您坚持将其应用于消费者组,则另一种选择是重置消费者组并使用您的代码,或者创建一个新的消费者组。
推荐阅读
- javascript - 有没有办法在字符串上有条件地引用诸如 .replace() 之类的函数?
- c++ - 是否可以从作为用户输入的字符串或从文件中以 C/C++(或任何语言)等语言创建用户定义的数据类型
- node.js - 如何在 Cypress.io 中使用 Firebase Auth 的 Google 登录选项
- html - 锚元素在 DOM 中移动
- xcode - Set Interface Builder Tag
- google-play - Google Play Product Id
- javascript - react router is only replacing the final route on history.push
- swift - macOS 上是否存在与 kIOPSCurrentCapacityKey 等效的电池电量更改通知?
- icons - Maya 2015-自定义货架按钮缺少图标
- spring-boot - Springboot 无法使用 springboot 2.0.3 获取连接(用户名,密码),但在 1.5.3 中工作正常