rebus - Rebus 中的竞争消费者是否可以使用 InMemory Transport?
问题描述
是否可以使用 Rebus InMemory Transport来实现竞争消费者,例如示例 RabbitScaleout?
背景:我有一个单体应用程序,我必须分解和扩展某些部分。我们必须维护单体部署,同时进行容器/云部署。
我的意图是重构,使用 Rebus 来协调服务之间的协调,使用 InMemoryTransport 来处理单体应用,使用 RabbitMQ 来处理容器化服务。这行得通吗?
下面的代码导致两个处理程序都被调用。
class Program
{
static void Main(string[] args)
{
using (var activator = new BuiltinHandlerActivator())
{
activator.Register(() => new Handler1());
activator.Register(() => new Handler2());
Configure.With(activator)
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "queue"))
.Logging( t=> t.None())
.Routing( r=> r.TypeBased().Map<string>("queue"))
.Start();
activator.Bus.Send("test");
Thread.Sleep(1000);
}
}
}
class Handler1 : IHandleMessages<String>
{
public async Task Handle(String message)
{
Console.WriteLine("Handler1 " + message);
}
}
class Handler2 : IHandleMessages<String>
{
public async Task Handle(string message)
{
Console.WriteLine("Handler2 " + message);
}
}
解决方案
内存中传输当然可以用于竞争消费者的场景。
诀窍是将相同的InMemNetwork
实例传递给具有相同输入队列名称的两个总线实例——然后它们将在所有实例之间分发消息。
// buses will be connected on this network
var network = new InMemNetwork();
using (var activator1 = new BuiltinHandlerActivator())
using (var activator2 = new BuiltinHandlerActivator())
{
// when we're doing "competing consumers", we should probably execute
// the same logic – therefore, we register the same handler twice here:
activator1.Register(() => new Handler1());
activator2.Register(() => new Handler1());
// start bus 1
Configure.With(activator1)
.Transport(t => t.UseInMemoryTransport(network, "queue"))
.Logging( t=> t.None())
.Routing( r=> r.TypeBased().Map<string>("queue"))
.Start();
// start bus 2
Configure.With(activator2)
.Transport(t => t.UseInMemoryTransport(network, "queue"))
.Logging( t=> t.None())
.Routing( r=> r.TypeBased().Map<string>("queue"))
.Start();
// always AWAIT here
await activator1.Bus.Send("test");
//
// - if that's not possible, use the sync bus:
activator1.Bus.Advanced.SyncBus.Send("test");
Thread.Sleep(1000);
}
虽然这会起作用,但我不确定我是否会推荐它 - 内存中传输,由于在内存中,不提供任何类型的持久性,因此如果InMemNetwork
实例,您将丢失所有队列中的所有消息丢失(例如由于进程重新启动或其他原因)。
此外,如果意图是并行化消息处理,我不确定您是否希望这样做,因为总线将在同一个进程中运行,因此竞争相同的资源 - 所以这样做的效果只会简单是另一种并行性......你可以更容易地并行化消息处理
a) 增加并行度
Configure.With(...)
.(...)
.Options(o => o.SetMaxParallelism(100))
.Start();
(这里可以并行处理多达 100 条消息,如果它们的工作是Task
基于 - 通常是 I/O 绑定的东西),
或 b) 增加工作线程的数量
Configure.With(...)
.(...)
.Options(o => o.SetNumberOfWorkers(10))
.Start();
(如果您的传输没有Task
基于 - 的 API,这主要是有意义的)(此处将 Rebus 配置为生成 10 个专用线程用于消息处理)。
推荐阅读
- lambda - lisp 错误:“应该是 lambda 表达式”
- ios - 使用电容器在iOS应用构建中滚动时如何防止内容显示在安全区域上方?
- android - 如何在布局android中匹配高度和宽度1:1
- python - 多个进程是否可以同时仅从 Python 中的文件读取(而不是写入)?
- javascript - 使用JS将对象数组转换为包含具有相同值的对象的数组数组?
- java - 从 HTML 中查找 JSON 字符串
- excel - 将字典键与工作表单元格值匹配
- sql - COUNT DISTINCT WITH CONDITION 和 GROUP BY
- json - spring boot返回转义的json
- database - 如何在现有 MongoDB 文档中插入对象数组/列表?