c# - 如何在同一个集合的多个 MongoDB 观察者之间分配工作负载?
问题描述
我想知道是否可以为一个集合启动多个观察者?
在我的例子中,我启动了多个线程来监视一个集合,但是 mongo 将相同的文档返回给所有观察者。
如何让 MongoDB 在这些启动的线程之间拆分文档负载?
线程启动器:
for (var i = 0; i < 4; i++)
{
ThreadMongoDB thWork = new ThreadMongoDB(_config, _serviceProvider);
thWork.ThreadName = "TH-" + i.ToString();
Thread th = new Thread(thWork.Process);
await Task.Run(() =>
{
th.Start();
});
}
ThreadMongoDB 类中的代码:
public class ThreadMongoDB
{
readonly IMongoDatabase _db;
readonly Executer _exec;
public string ThreadName { get; set; }
public ThreadMongoDB(IConfiguration iconfig, IServiceProvider serviceProvider)
{
var typeConn = iconfig["TypeConn"];
var client = new MongoClient(iconfig[$"{typeConn}:cnnMain"]);
_db = client.GetDatabase(iconfig[$"{typeConn}:dbMain"]);
_exec = new Executer(iconfig, serviceProvider);
}
public void Process()
{
var options = new ChangeStreamOptions() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<WorkerQueue>>()
.Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] }}");
//.Match("{ $or: [ {operationType: 'replace' }, { operationType: 'insert' }, { operationType: 'update' } ] }");
using (var pilotQueueStream = _db.GetCollection<WorkerQueue>("one_queue_pilot")
.Watch(pipeline, options).ToEnumerable().GetEnumerator())
{
while (true)
{
pilotQueueStream.MoveNext();
WorkerQueue currentReg = pilotQueueStream.Current.FullDocument;
if (currentReg != null)
{
if (!currentReg.boolDone)
{
_exec.Execute(ThreadName, currentReg);
}
}
}
}
}
}
}
解决方案
我想知道是否可以为一个集合启动多个观察者?
在我的例子中,我启动了多个线程来监视一个集合,但是 mongo 将相同的文档返回给所有观察者。
这个结果是意料之中的:您可以有多个活动的变更流,但每个流都是独立的,并且观察者之间没有协调。观看相同的流将导致相同的事件。
如何让 MongoDB 在这些启动的线程之间拆分文档负载?
必须在您的应用程序中处理此要求。
如果您的观察者正在对更改流事件进行大量处理并且您希望分配工作负载,则需要考虑的几种方法是:
让变更流观察者将事件推送到中央工作/作业队列,以便多个工作线程可以从队列中声明和处理作业。这是典型的批处理模式。
让每个观察者都包含一个
$match
阶段,该阶段过滤要处理的更改流事件的不同子集。这种方法需要一些计划以确保事件不重叠,并且如果某些事件类型需要比其他类型更多的工作人员,则可扩展性较差。
在实施任何一种方法之前,绝对值得确认您的工作负载需要更多的工人/线程。
推荐阅读
- c++ - 如何从数字中获取 Unicode 字符?
- java - 在 AWS Lambda Java 中解析 Kinesis 数据流
- r - 在(反)对角线上应用函数
- javascript - 引导程序中的 net::ERR_ABORTED 500(内部服务器错误)
- batch-file - Windows 批处理文件 xcopy 同一文件夹中的特定文件
- angular - 为什么 ng serve 在进程监视器中显示多次
- javascript - 如何使用“toHaveBeenCalledWith”在 Jasmine 中断言布尔值
- compiler-errors - 在 Ubuntu 中构建 Code Composer Studio 嵌入式项目(带有指向外部项目的链接)时找不到头文件错误
- javascript - 如何使用 arguments.length 来查找传递给函数的参数数量?
- spring-boot - docker 化 maven 项目时出错。构建 Dockerfile 时出错。怎么能解决呢?