首页 > 解决方案 > 如何在同一个集合的多个 MongoDB 观察者之间分配工作负载?

问题描述

我想知道是否可以为一个集合启动多个观察者?

在我的例子中,我启动了多个线程来监视一个集合,但是 mongo 将相同的文档返回给所有观察者。

如何让 MongoDB 在这些启动的线程之间拆分文档负载?

线程启动器:

    for (var i = 0; i < 4; i++)
    {
                ThreadMongoDB thWork = new ThreadMongoDB(_config, _serviceProvider);
                thWork.ThreadName = "TH-" + i.ToString();

                Thread th = new Thread(thWork.Process);

                await Task.Run(() =>
                {
                    th.Start();
                });
    }

ThreadMongoDB 类中的代码:

public class ThreadMongoDB
    {
        readonly IMongoDatabase _db;
        readonly Executer _exec;

        public string ThreadName { get; set; }


        public ThreadMongoDB(IConfiguration iconfig, IServiceProvider serviceProvider)
        {
            var typeConn = iconfig["TypeConn"];
            var client = new MongoClient(iconfig[$"{typeConn}:cnnMain"]);
            _db = client.GetDatabase(iconfig[$"{typeConn}:dbMain"]);

            _exec = new Executer(iconfig, serviceProvider);
        }

        public void Process()
        {
            var options = new ChangeStreamOptions() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<WorkerQueue>>()
                .Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] }}");
                //.Match("{ $or: [ {operationType: 'replace' }, { operationType: 'insert' }, { operationType: 'update' } ] }");

            using (var pilotQueueStream = _db.GetCollection<WorkerQueue>("one_queue_pilot")
                .Watch(pipeline, options).ToEnumerable().GetEnumerator())
            {
                while (true)
                {
                    pilotQueueStream.MoveNext();

                    WorkerQueue currentReg = pilotQueueStream.Current.FullDocument;

                    if (currentReg != null)
                    {
                        if (!currentReg.boolDone)
                        {
                           _exec.Execute(ThreadName, currentReg);
                        }
                    }
                }
            }
        }
    }
}

标签: c#mongodb

解决方案


我想知道是否可以为一个集合启动多个观察者?

在我的例子中,我启动了多个线程来监视一个集合,但是 mongo 将相同的文档返回给所有观察者。

这个结果是意料之中的:您可以有多个活动的变更流,但每个流都是独立的,并且观察者之间没有协调。观看相同的流将导致相同的事件。

如何让 MongoDB 在这些启动的线程之间拆分文档负载?

必须在您的应用程序中处理此要求。

如果您的观察者正在对更改流事件进行大量处理并且您希望分配工作负载,则需要考虑的几种方法是:

  • 让变更流观察者将事件推送到中央工作/作业队列,以便多个工作线程可以从队列中声明和处理作业。这是典型的批处理模式。

  • 让每个观察者都包含一个$match阶段,该阶段过滤要处理的更改流事件的不同子集。这种方法需要一些计划以确保事件不重叠,并且如果某些事件类型需要比其他类型更多的工作人员,则可扩展性较差。

在实施任何一种方法之前,绝对值得确认您的工作负载需要更多的工人/线程。


推荐阅读