首页 > 解决方案 > 代码从捕获中掉出来?

问题描述

我有一个后台任务每 200 毫秒轮询一次 SQL Server 数据库。

代码如下所示:

listener = await Task.Factory.StartNew(async () =>
            {
                try
                {
                    while (true)
                    {
                        topToken.ThrowIfCancellationRequested();

                        try
                        {
                            using (var dbConnection = new SqlConnection(ConnectionString))
                            using (var command = new SqlCommand("marc.GetEvents", dbConnection))
                            {
                                await command.Connection.OpenAsync().ConfigureAwait(false);
                                command.CommandType = CommandType.StoredProcedure;
                                command.Parameters.AddWithValue("@fromId", lastEventId);

                                using (var reader = await command.ExecuteReaderAsync(topToken).ConfigureAwait(false))
                                {
                                    int received = lastEventId;
                                    while (await reader.ReadAsync(topToken).ConfigureAwait(false))
                                    {
                                        /// do stuff...
                                    }
                                    lastEventId = received;
                                }
                            }
                            await Task.Delay(PollIntervalMilliseconds, topToken).ConfigureAwait(false);
                        }
                        catch (OperationCanceledException)
                        {
                            throw;
                        }
                        catch (Exception ex)
                        {
                            if (ex is SqlException && topToken.IsCancellationRequested)
                            {
                                throw new OperationCanceledException("Operation cancelled by user", ex);
                            }

                            logger.Warn(ex, $"Exception on polling Codeks db. Waiting {delayOnSqlError}ms..."); // this is hit
                            _OnReaderEvent.OnError(ex);
                            await Task.Delay(delayOnSqlError, topToken).ConfigureAwait(false); // probably not executed
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    logger.Info("Listening task ended. Service is stopping?");
                }
                catch (Exception ex)
                {
                    logger.Error(ex, "General exception"); // falling here
                }
            }, TaskCreationOptions.LongRunning).ConfigureAwait(false);

今天我收到一份报告,说这个任务提前结束了。根据日志,第一catch组命中并报告 SQL 异常:

2018-08-01 17:42:08.6348|警告|轮询 Codeks db 时出现异常。等待 5000 毫秒... System.Data.SqlClient.SqlException (0x80131904): Transaction (Process ID 53) was deadlocked on lock | 与另一个进程通信缓冲区资源,并已被选为死锁牺牲品。重新运行事务。

但它不是延迟,而是立即从循环中掉出到外部catch,但有同样的例外。

2018-08-01 17:42:08.6488|错误|Jantar.CodeksConnector|一般异常 System.Data.SqlClient.SqlException (0x80131904): 事务 (进程 ID 53) 被锁死锁 | 与另一个进程通信缓冲区资源,并已被选为死锁牺牲品。重新运行事务。在 System.Data.SqlClient.SqlConnection.OnError(SqlException 异常,布尔 breakConnection,操作1 wrapCloseInAction) at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action1 wrapCloseInAction) 在 System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose) 在 System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject state&Obj, Boolean dataReady) 在 System.Data.SqlClient.SqlDataReader.TryReadInternal(Boolean setTimeout, Boolean& more) 在 System.Data.SqlClient.SqlDataReader.TryHasMoreRows(Boolean& moreRows) 在 System.Data.SqlClient.SqlDataReader.<>c__DisplayClass189_0.b__0(Task t ) 在 System.Data.SqlClient.SqlDataReader.InvokeRetryable[T](Func2 moreFunc, TaskCompletionSource1 来源,IDisposable objectToDispose) --- 从先前引发异常的位置结束堆栈跟踪 --- 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task ) 在 Jantar.CodeksConnector 的 System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)。<b__18_0>d.MoveNext() --- 从先前引发异常的位置结束堆栈跟踪 --- 在 System.Reactive。 PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) at System.Reactive.Stubs.<>c.<.cctor>b__2_1(Exception ex) at System.Reactive.AnonymousSafeObserver 1.OnError(Exception error) at System.Reactive.Linq.ObservableImpl.SelectMany2. .OnError(Exception error) at System.Reactive.Linq。 ObservableImpl.Where 1._.OnError(Exception error) at System.Reactive.Linq.ObservableImpl.AsObservable1.System.Reactive.Observer中的1.OnError(Exception error) at System.Reactive.Subjects.Subject.OnError(异常错误) Jantar.CodeksConnector 中的 1.OnError(异常错误)。<b__18_0>d.MoveNext()

我没主意了...

[更新:08.03]

@sellotape 为我指出了正确的方向。根据更新的第二个日志条目,堆栈跟踪清楚地表明异常是由Subject<T>.onError(ex)(我已将其删除,因为它是一个错误)重新引发的。这是一个双重错误,因为没有错误订阅者。我不知道在这种情况下会重新抛出异常,但前提是有任何订阅者,如果没有订阅者则吞下。

标签: c#sql-serverasync-awaitsystem.reactive.net-4.6.2

解决方案


虽然这不是您问题的直接答案,但您已将其标记为“System.Reactive”,所以我想我可能会(大致)向您展示您的代码的 Rx 解决方案是什么样的。请记住,我无法准确地提供代码,/// do stuff...所以我编造了。

这是处方药:

IObservable<string> query =
    from t in Observable.Interval(TimeSpan.FromMilliseconds(PollIntervalMilliseconds))
    from x in Observable.Using(
        () => new SqlConnection(ConnectionString),
        dbConnection =>
            Observable.Using(
                () =>
                {
                    var c = new SqlCommand("marc.GetEvents", dbConnection);
                    c.CommandType = CommandType.StoredProcedure;
                    c.Parameters.AddWithValue("@fromId", lastEventId);
                    return c;
                },
                command =>
                    from o in Observable.FromAsync(() => command.Connection.OpenAsync())
                    from reader in Observable.FromAsync(() => command.ExecuteReaderAsync(topToken))
                    let received = lastEventId
                    from r in Observable.FromAsync(() => reader.ReadAsync(topToken))
                    select reader.GetFieldValue<string>(0)))
    select x;

推荐阅读