c# - 轮询消息分配到分区不适用于 asp.net 核心中没有密钥的消息
问题描述
尝试以循环方式向所有分区发送消息,但所有消息都进入最后一个分区。任何人都可以帮我吗?
我正在使用Confluent.Kafka nuget 包。我的生产者配置 -
"ProducerConfiguration": {
"bootstrap.servers": "localhost:9092"
}
还有我的 Kafka 制作人班——
public class Publisher
{
private ProducerConfig _producerConfig;
public Publisher(IOptions<ApplicationSetting> _applicationSetting)
{
var producerConfig = _applicationSetting.Value.KafkaConfiguration.ProducerConfiguration;
_producerConfig = new ProducerConfig(producerConfig);
}
public async Task<DeliveryResult<Null, TValue>> Publish<TValue>(TValue message, string topic = null)
{
using var producer = new ProducerBuilder<Null, TValue>(_producerConfig)
.SetValueSerializer(new JsonSerializer<TValue>())
.Build();
var topicName = String.IsNullOrEmpty(topic) ? message.GetType().Name : topic;
return await producer.ProduceAsync(topicName, new Message<Null, TValue>() { Value = message });
}
}
发布消息,例如-
public class DemoHandler : IRequestHandler<SendMail, string>
{
private readonly Publisher _publisher;
public DemoHandler(Publisher publisher)
{
_publisher = publisher;
}
public async Task<string> Handle(SendMail message, CancellationToken cancellationToken)
{
await _publisher.Publish(message);
return "Message sent";
}
}
所有消息都只发送到最后一个分区 -
提前致谢。
解决方案
推荐阅读
- docker - 使用 JDBC 输入插件在 Docker 上的 Logstash 不会从 SQL Server 获取所有行
- flutter - 如何突出显示我在字符串中搜索的确切单词,而不仅仅是字符串的开头?
- flutter - 如何实时获取文档长度
- javascript - Reactjs 从获取请求中返回一个对象
- javascript - Nginx 正在尝试打开文件而不是重定向到代理
- python - _joint_log_likelihood 给我错误的值
- sql - 如何按日期获取最后一件商品的价格
- python-3.x - 定期执行功能而不停止其他操作
- r - 使用 R 从 PDF 表单到数据框的文本挖掘
- r - 如果 r 中出现错误,则转到 lapply() 的下一次迭代