.net - 异步的Kafka生产者不返回DeliveryReport但DeliveryResult
问题描述
我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用生产它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用ProduceAsync时返回类型是 DeliveryResult 我如何获取 DeliveryReport 并记录失败的原因
使用生产:
public void WriteMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
producer.Produce(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
},
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
}
在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport,并且可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,以下是元数据:
namespace Confluent.Kafka
{
//
// Summary:
// The result of a produce request.
public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
{
public DeliveryReport();
//
// Summary:
// An error (or NoError) associated with the message.
public Error Error { get; set; }
//
// Summary:
// The TopicPartitionOffsetError associated with the message.
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
}
}
使用 ProduceAsync
public async Task WriteAysncMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
producer.Flush(TimeSpan.FromSeconds(60));
}
}
在上述方法中,使用 ProducerAsync 时,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它返回 DeliveryResult 但不返回 DeliveryReport
此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 也很好。
解决方案
我想我得到了解决方案:
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
try
{
var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
}
producer.Flush(TimeSpan.FromSeconds(60));
}
推荐阅读
- php - 雄辩的获取所有列,但检查一个不同的列
- flutter - Flutter 识别循环或 listview.builder 构建的 Widget
- postgresql - 在 WSL2 中运行的 Flask 无法连接到在 Windows 上运行的 postgres(与 WSL1 一起工作)
- intellij-idea - 使用 IntelliJ IDEA 从 JFR 转储生成火焰图
- sql - SQL 查询仅查找具有 2 个源值的客户 ID
- keycloak - 部署的 Keycloak 脚本映射器未显示在 GUI 中
- c - C 中跨进程的条件等待和信号
- vim - 如何在 .vimrc 的 gvim cabbrev 中使用管道运算符 (|)?
- django - 如何保存实例并更新模型表单中的反向 M2M 字段
- node.js - axios Promise.all 永无止境