asp.net-core - 尝试在 k8s 上连接 rabbitmq 集群时出现 Masstransit 错误
问题描述
我在 k8s 上有 rabbitmq 集群。我有一个使用 MassTransit 和 rabbitmq 的应用程序。当我在启动时将我的应用程序部署到 k8s 时,MassTransit 写入错误。我确信rabbitmq 是可以访问的,因为我还添加了返回健康的rabbitmq 健康检查。我还分享了服务 yaml 和主机信息。
我还可以查看从 rabbitmq 管理 ui 创建的身份交换
启动.cs
var rabbitConfig = new RabbitMqConfig();
rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");
services.AddSingleton(rabbitConfig);
var factory = new ConnectionFactory()
{
HostName = rabbitConfig.Host,
Password = rabbitConfig.Password,
UserName = rabbitConfig.UserName,
VirtualHost = "/",
Port = rabbitConfig.Port,
AutomaticRecoveryEnabled = true
};
var connection = factory.CreateConnection();
services.AddSingleton<IConnection>(connection);
string connectionString = Configuration.GetValue<string>("CONNECTION_STRING");
services.AddDbContext<DataContext>(options =>
{
options.UseMySql(connectionString);
});
services.AddHealthChecks()
.AddMySql(connectionString, "MySQL")
.AddRabbitMQ(name: "Rabbit");
services.AddMassTransit( x=> {
x.AddConsumers(typeof(Startup).Assembly);
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
{
h.Username(rabbitConfig.UserName);
h.Password(rabbitConfig.Password);
};
cfg.Host(new Uri($"rabbitmq://{rabbitConfig.Host}"),"/", configure);
cfg.ReceiveEndpoint("identity", e=> {
e.UseMessageRetry(x => x.Interval(2, 100));
});
}));
});
services.AddSingleton<IHostedService, BusService>();
服务.yaml
kind: Service
apiVersion: v1
metadata:
namespace: rabbitmq
name: rabbitmq
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
ports:
- name: rabbitmq-mgmt-port
protocol: TCP
port: 15672
targetPort: 15672
- name: rabbitmq-amqp-port
protocol: TCP
port: 5672
targetPort: 5672
主持人:
rabbitmq.rabbitmq.svc.cluster.local
当我将我的应用程序部署到 k8s 集群时,MassTransit 写入如下错误。
MassTransit[0] 操作中断:RabbitMQ.Client.Exceptions.OperationInterruptedException:AMQP 操作被中断:AMQP 关闭原因,由 Peer 发起,代码 = 541,文本 ='INTERNAL_ERROR',类 ID = 0,方法 ID = 0 在 RabbitMQ。在 MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext 的 RabbitMQ.Client.Impl.ModelBase.QueueDeclare 的 Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan 超时)(字符串队列,布尔被动,布尔持久,布尔独占,布尔自动删除,IDictionary
2 arguments) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary
2 参数)。 <>c__DisplayClass19_0.b__0() at System.Threading.Tasks.Task1.InnerInvoke() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.ConfigureTopology(ModelContext context) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func
2 setupMethod, PayloadFactory1 payloadFactory) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.GreenPipes.IFilter.Send(ModelContext context, IPipe1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe
1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send(ConnectionContext context, IPipe1 next) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send(IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send (IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send(IPipe`1 管道,CancellationToken cancelToken)在 MassTransit.RabbitMqTransport.Transport.RabbitMqReceiveTransport.b__12_0()
解决方案
推荐阅读
- azure - Azure ARM 模板依赖于不工作(重新部署成功)
- swift - 更新到 12.2 Exc_Bad_Access 后的 XCODE 模拟器错误(代码=50,地址=0x11b40e1ad)
- python - 如何为 Keras/Tensorflow 机器学习创建(嵌套?)csv 文件加载器?
- javascript - 当子项在 Wordpress 子菜单中处于活动状态时,从父项中删除活动类
- mongodb - 在循环中运行 mongodb 查询
- oracle - Oracle远程连接防火墙KO
- google-sheets - 计算逗号分隔的列列表中的唯一值
- python - URL 不能包含控制字符 - Python
- postgresql - 无法在 postgres 中插入嵌套记录
- reactjs - SetState 外体 reactjs