首页 > 解决方案 > C#:OperationCanceledException:操作被取消

问题描述

下面我试图将数据发送到 eventhub,它工作了几分钟,然后抛出了OperationCanceledException 。关于我在使用 CancellationToken 时哪里错的任何提示(如果那是我应该使用的)?或者我该如何解决这个问题?

public async void send<T>(IEnumerable<T> list, string eventhubname)
{                
     var token = new CancellationTokenSource();
     CancellationToken ct = token.Token;
     EventHubProducerClient producer = null;

     try
     {

        producer = new EventHubProducerClient(this._connectionString, eventhubname);

        var eventBatch = await producer.CreateBatchAsync(ct); **Line 148 here**
        foreach (T item in list)
        {
            eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(item.ToString())));
        }
       await producer.SendAsync(eventBatch);
       await producer.DisposeAsync();
     }
    catch (Exception ex)
    {
       //($"Error While sending message to Event Hub: { ex.Message}", ex);
        if (producer != null)
        {
            await producer.DisposeAsync();
        }
        if (ct.IsCancellationRequested)
        {
        token.Dispose();
        throw new TaskCanceledException(ex.Message);
        }
        throw;
    }
}

以下是异常梗


System.OperationCanceledException: The operation was canceled.
   at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpCbsLink.SendTokenAsyncResult.<>c__DisplayClass13_0.<GetAsyncSteps>b__3(SendTokenAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.Azure.Amqp.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpCbsLink.<>c__DisplayClass4_0.<SendTokenAsync>b__1(IAsyncResult a)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.<CreateSendingLinkAsync>d__63.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.<OpenProducerLinkAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Azure.Messaging.EventHubs.Amqp.AmqpProducer.<CreateLinkAndEnsureProducerStateAsync>d__32.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.<OnCreateAsync>d__6.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.Amqp.Singleton`1.<GetOrCreateAsync>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.Amqp.Singleton`1.<GetOrCreateAsync>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Azure.Messaging.EventHubs.Amqp.AmqpProducer.<CreateBatchAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Azure.Messaging.EventHubs.Producer.EventHubProducerClient.<CreateBatchAsync>d__42.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at <send>d__20`1.MoveNext() in  line 148

标签: c#async-awaitazure-eventhub

解决方案


异常OperationCanceledException通常意味着事件中心服务操作超时。在您的堆栈跟踪中,客户端似乎在尝试建立到服务的 AMQP 链接并发送授权令牌时超时。

这通常表明与服务的网络通信存在问题。如果没有更多关于您的代码运行环境的上下文,我只能推测原因。

一种常见情况是在无法使用原始 TCP 通信的环境(例如 Xamarin Android)中运行时。另一种常见情况是在防火墙规则过滤传出连接的环境中运行时。对于 TCP 传输,您需要确保标准 AMQP 端口 5671 和 5672 已打开并可用于传出连接。

要解决这两种情况,您可能需要尝试将TransportType您的EventHubProducerClientOptionsto设置为EventHubsTransportType.AmqpWebSockets.

例如:

var options = new EventHubClientOptions();
options.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets;

await using var producer = new EventHubProducerClient(
    "<< CONNECTION STRING >>", 
    "<< EVENT HUB NAME >>", 
    options);

// MORE CODE...

关于您的代码段,我想提及的一件重要事情是您可能会丢失数据。因为您忽略了 的返回值TryAdd,所以如果您传递的可枚举大于可以在单个批次中发送的可枚举,那么您将默默地无法添加它们。

我建议您要么考虑尊重返回,TryAdd要么使用SendAsync接受一组事件的重载。在前一种情况下,如果TryAdd返回false,那么您知道批次已满,您应该将您的集合分成多个批次。在后一种情况下,如果集合太大而无法在单个调用中发送,则调用将失败。

对于一些额外的想法:

  • 我看不出您需要创建取消令牌的原因,因为您没有使用它来请求取消发送,因此您可能会跳过该步骤。

  • 生产者客户端为方便起见允许处置;像 一样HttpClient,作为长期客户端使用是有效的。如果您在一段时间内发送数据,我建议您创建一次,然后仅在您的应用程序关闭或您完成发送一段时间后关闭/处置。

  • 是一次性的EventDataBatch,并且确实包含对非托管项目的引用。我建议确保您在发送操作完成时处理它。

将一些反馈付诸行动,同时将生产者的范围限制在单个方法调用中,示例如下所示:

public async void Send<T>(IEnumerable<T> data, string eventHubName)
{      
    var options = new EventHubClientOptions();
    options.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets; 
         
    await using var producer = new EventHubProducerClient(
        this._connectionString, 
        eventHubName, 
        options);

    try
    {
        var eventSet =
            data.Select(item => new EventData(Encoding.UTF8.GetBytes(item.ToString()));

        await producer.SendAsync(eventSet).ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        Log($"Error While sending message to Event Hub: { ex.Message}", ex);
        throw;
    }
}

有关更全面的示例,您可能需要查看:


推荐阅读