首页 > 解决方案 > 使用 .NET `Confluent.Kafka` `subscribe`/`assign` 到 Kafka 主题的正确方法是什么?

问题描述

Task.Run(
                        () =>
                        {
                            try
                            {
                                var (topic, partitionOrNull, offsetOrNull) = target;

                                if (partitionOrNull == null && offsetOrNull == null) consumer.Subscribe(topic);
                                else
                                {
                                    var partition = partitionOrNull ?? 0;
                                    if (offsetOrNull != null) consumer.Assign(new TopicPartitionOffset(topic, partition, offsetOrNull.Value));
                                    else consumer.Assign(new TopicPartition(topic, partition));
                                } 

                                while (!cancellationToken.IsCancellationRequested)
                                {
                                    var consumeResult = consumer.Consume(cancellationToken);

                                    if (consumeResult.IsPartitionEOF) continue;

                                    observer.OnNext((consumeResult.Offset.Value, consumeResult.Key, consumeResult.Value));
                                }
                            }
                            catch (Exception exception)
                            {
                                observer.OnError(exception);
                            }
                        },
                        cancellationToken);

                    return Task.FromResult<IDisposable>(consumer);
                });

因此,有一个 Kafka 消费者(只是 latest 的一个小包装器Confluent.Kafka)涵盖了两种情况:subscribe使用 dynaimc 重新平衡和assigning 到指定的分区和/或偏移量。问题是当我只指定topic( partition == null && offset == null) 时,消费者无限挂起而没有任何进展。我是在滥用框架还是发生了什么?

PS 准确地指定所有三个参数效果很好。

标签: c#.netapache-kafkatask-parallel-libraryconfluent-platform

解决方案


推荐阅读