首页 > 解决方案 > Rebus 中的竞争消费者是否可以使用 InMemory Transport?

问题描述

是否可以使用 Rebus InMemory Transport来实现竞争消费者,例如示例 RabbitScaleout?

背景:我有一个单体应用程序,我必须分解和扩展某些部分。我们必须维护单体部署,同时进行容器/云部署。

我的意图是重构,使用 Rebus 来协调服务之间的协调,使用 InMemoryTransport 来处理单体应用,使用 RabbitMQ 来处理容器化服务。这行得通吗?

下面的代码导致两个处理程序都被调用。

class Program
    {
        static void Main(string[] args)
        {
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Register(() => new Handler1());
                activator.Register(() => new Handler2());

                Configure.With(activator)
                    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "queue"))
                    .Logging( t=> t.None())
                    .Routing( r=> r.TypeBased().Map<string>("queue"))
                    .Start();


                activator.Bus.Send("test");

                Thread.Sleep(1000);
                    
            }
        }
    }


    class Handler1 : IHandleMessages<String>
    {
        public async Task Handle(String message)
        {
            Console.WriteLine("Handler1 " + message);
        }
    }

    class Handler2 : IHandleMessages<String>
    {
        public async Task Handle(string message)
        {
            Console.WriteLine("Handler2 " + message);
        }
    }

标签: rebus

解决方案


内存中传输当然可以用于竞争消费者的场景。

诀窍是将相同的InMemNetwork实例传递给具有相同输入队列名称的两个总线实例——然后它们将在所有实例之间分发消息。

// buses will be connected on this network
var network = new InMemNetwork();

using (var activator1 = new BuiltinHandlerActivator())
using (var activator2 = new BuiltinHandlerActivator())
{
    // when we're doing "competing consumers", we should probably execute
    // the same logic – therefore, we register the same handler twice here:
    activator1.Register(() => new Handler1());
    activator2.Register(() => new Handler1());

    // start bus 1
    Configure.With(activator1)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // start bus 2
    Configure.With(activator2)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // always AWAIT here
    await activator1.Bus.Send("test");
    //
    // - if that's not possible, use the sync bus:
    activator1.Bus.Advanced.SyncBus.Send("test");

    Thread.Sleep(1000);
}

虽然这会起作用,但我不确定我是否会推荐它 - 内存中传输,由于在内存中,不提供任何类型的持久性,因此如果InMemNetwork实例,您将丢失所有队列中的所有消息丢失(例如由于进程重新启动或其他原因)。

此外,如果意图是并行化消息处理,我不确定您是否希望这样做,因为总线将在同一个进程中运行,因此竞争相同的资源 - 所以这样做的效果只会简单是另一种并行性......你可以更容易地并行化消息处理

a) 增加并行度

Configure.With(...)
    .(...)
    .Options(o => o.SetMaxParallelism(100))
    .Start();

(这里可以并行处理多达 100 条消息,如果它们的工作是Task基于 - 通常是 I/O 绑定的东西),

或 b) 增加工作线程的数量

Configure.With(...)
    .(...)
    .Options(o => o.SetNumberOfWorkers(10))
    .Start();

(如果您的传输没有Task基于 - 的 API,这主要是有意义的)(此处将 Rebus 配置为生成 10 个专用线程用于消息处理)。


推荐阅读