azure-cosmosdb - Cosmo ChangeFeed - 错误、异常和服务失败场景的
问题描述
全部,
我正在使用更改馈送处理器库。想知道处理服务失败的最佳方法以及 ProcessChangesAsync 方法中的异常/错误场景。以下是我所指的事件。
1) 服务失败 - 让处理器库在某些操作过程中崩溃的服务。如何从同一个文档(失败实例的文档)开始流程?是否有任何内置机制可以让更改提要从最后一个失败的文档开始?例如,假设,在当前批次中,我们成功处理了 10 个文档。5 个,然后由于网络故障或其他原因导致服务中断。重新启动服务后,我的流程会从第 6 个文档开始吗?如何做到这一点?
2) 异常和错误 - ProcessChangesAsync 方法中的任何错误都可以在全局级别使用 try catch 进行处理,但是如何保留这些失败记录并使它们可用于下一批?同样,在更改馈送过程中寻找任何可用的内置机制。
解决方案
1) 默认情况下,处理器库在成功运行ProcessChangesAsync
. 在最新的库版本中,您可以自定义 Checkpointer 以在需要时执行手动检查点。如果由于某种原因处理器在检查点之前关闭,那么它将从存储在 Leases 集合中的最后一个成功的检查点开始下一个处理。在您的情况下,它将再次从第一个文档开始,因此您永远不会丢失更改,但您可能会经历双重处理(这是“至少一次”模型)。
2) 没有您可以利用的内置机制,在其中处理异常ProcessChangesAsync
是您的责任。您不仅可以添加全局 try/catch,而且如果您正在遍历文档,请在循环内添加 try/catch 以处理失败的文档(可能将其发送到队列以供以后分析/后处理)不丢失批次。如果您需要记录这些错误(我假设这就是您所说的持续错误的意思?),那么最新版本与LibLog兼容,因此插入您自己的自定义日志记录非常简单:
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;
var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider(); //You can use any provider supported by LibLog
using (tracelogProvider.OpenNestedContext(hostName))
{
LogProvider.SetCurrentLogProvider(tracelogProvider);
// After this, create IChangeFeedProcessor instance and start/stop it.
}
评论的额外信息
为避免异常暂停批处理或导致重新处理批处理,您可以进行如下处理:
public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
{
try
{
foreach(var document in documents)
{
try
{
// Do your work for the document
}
catch(Exception ex)
{
// Something happened with the current document, handle it, send it to a queue / another storage to analyze, log it. This catch will make the loop continue with the next.
}
}
}
catch(Exception ex)
{
// Something unhandled happened, log it and avoid throwing it again so the next batch is processed
}
}
推荐阅读
- shell - 我需要在 shell 中用换行符替换文字 \n
- git - 我将如何 git 合并两个具有原始祖先但分歧的相同项目?
- c# - 仍然没有得到如何从 BackgroundWorker 返回值
- html - 我在 bootstrap 列中遇到 img 缩放问题
- node.js - 猫鼬删除错误的文件
- java - 遍历 HashMap 并将值放入 List
- javascript - 异步减少
- python-3.x - 如何在使用 boto3 创建 api-gateway 时添加命令“使用 Lambda 代理集成”
- java - 在测试中调用 SDOHelper.createRootDataObject 时的 NPE
- spring - 从之前的步骤获取xml配置spring批处理中的jobExecutionContext