首页 > 解决方案 > Kafka .Net Produce 与 ProduceAsync

问题描述

我正在使用 log4net (.Net) 编写 kafka appender,但遇到了无法使用 await 的问题ProduceAsync

错误

此时无法启动异步操作。异步操作只能在异步处理程序或模块内或在页面生命周期中的某些事件期间启动。如果在执行 Page 时发生此异常,请确保将 Page 标记为<%@ Page Async="true" %>。此异常还可能表示尝试调用“async void”方法,在 ASP.NET 请求处理中通常不支持该方法。相反,异步方法应该返回一个任务,调用者应该等待它。,
堆栈跟踪:在 System.Web.AspNetSynchronizationContext.OperationStarted(at System.Runtime.CompilerServices.AsyncVoidMethodBuilder.Create()

代码

public class CustomAppender: AppenderSkeleton
{
     private IProducer<Null, string> p;

     public override void ActivateOptions()
     {
         // Setup kafka producer
     }

     protected override void Append(LoggingEvent loggingEvent)
     {
         // Get JSON from application
         // Add additional data to the json 
         callBroker(json, topic);
     }

     private async void callBroker(string json, string topic)
     {
         var result = await p.ProduceAsync(Topic, new Message<Null, string>{Value=json});
     }          
}

我可以在我的callBroker方法中返回 Task,但是 Append 方法没有异步覆盖。

所以我的问题是,我可以ProduceAsync在大容量环境中使用 Producer.Produce 吗?该程序将记录 >500 条消息/秒,是否有偏好哪个效果更好?如果特定错误代码失败,我还需要处理一些异常并采取一些措施。

标签: c#apache-kafkalog4netconfluent-platformkafka-producer-api

解决方案


同步版本

protected override void Append(LoggingEvent loggingEvent)
{
    CallBroker(topic, json);
}

private void CallBroker(string topic, string message)
{
    producer.Produce(topic, new Message<Null, string> { Value = message });
} 

半异步版本

如果您无法更改Append方法的签名,则可以通过以下方式在阻塞模式下调用异步方法:

protected override void Append(LoggingEvent loggingEvent)
{
    CallBrokerAsync(topic, json).GetAwaiter().GetResult();
}

private async Task CallBrokerAsync(string topic, string message)
{
    await producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
}

异步使用时会发光(从最顶层的入口点到所有层,直到调用异步 I/O 操作的最低组件)

与往常一样,测量、测量和测量以了解此更改如何影响您的应用程序。


推荐阅读