首页 > 解决方案 > Cosmo ChangeFeed - 错误、异常和服务失败场景的

问题描述

全部,

我正在使用更改馈送处理器库。想知道处理服务失败的最佳方法以及 ProcessChangesAsync 方法中的异常/错误场景。以下是我所指的事件。

1) 服务失败 - 让处理器库在某些操作过程中崩溃的服务。如何从同一个文档(失败实例的文档)开始流程?是否有任何内置机制可以让更改提要从最后一个失败的文档开始?例如,假设,在当前批次中,我们成功处理了 10 个文档。5 个,然后由于网络故障或其他原因导致服务中断。重新启动服务后,我的流程会从第 6 个文档开始吗?如何做到这一点?

2) 异常和错误 - ProcessChangesAsync 方法中的任何错误都可以在全局级别使用 try catch 进行处理,但是如何保留这些失败记录并使它们可用于下一批?同样,在更改馈送过程中寻找任何可用的内置机制。

标签: azure-cosmosdb

解决方案


1) 默认情况下,处理器库在成功运行ProcessChangesAsync. 在最新的库版本中,您可以自定义 Checkpointer 以在需要时执行手动检查点。如果由于某种原因处理器在检查点之前关闭,那么它将从存储在 Leases 集合中的最后一个成功的检查点开始下一个处理。在您的情况下,它将再次从第一个文档开始,因此您永远不会丢失更改,但您可能会经历双重处理(这是“至少一次”模型)。

2) 没有您可以利用的内置机制,在其中处理异常ProcessChangesAsync是您的责任。您不仅可以添加全局 try/catch,而且如果您正在遍历文档,请在循环内添加 try/catch 以处理失败的文档(可能将其发送到队列以供以后分析/后处理)不丢失批次。如果您需要记录这些错误(我假设这就是您所说的持续错误的意思?),那么最新版本与LibLog兼容,因此插入您自己的自定义日志记录非常简单:

using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider(); //You can use any provider supported by LibLog
using (tracelogProvider.OpenNestedContext(hostName))
{
    LogProvider.SetCurrentLogProvider(tracelogProvider);
    // After this, create IChangeFeedProcessor instance and start/stop it.
}

来源

评论的额外信息

为避免异常暂停批处理或导致重新处理批处理,您可以进行如下处理:

public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
{
    try
    {
        foreach(var document in documents)
        {
            try
            {
                // Do your work for the document
            }
            catch(Exception ex)
            {
                // Something happened with the current document, handle it, send it to a queue / another storage to analyze, log it. This catch will make the loop continue with the next.
            }

        }
    }
    catch(Exception ex)
    {
        // Something unhandled happened, log it and avoid throwing it again so the next batch is processed    
    }
}

推荐阅读