首页 > 解决方案 > 尝试在 k8s 上连接 rabbitmq 集群时出现 Masstransit 错误

问题描述

我在 k8s 上有 rabbitmq 集群。我有一个使用 MassTransit 和 rabbitmq 的应用程序。当我在启动时将我的应用程序部署到 k8s 时,MassTransit 写入错误。我确信rabbitmq 是可以访问的,因为我还添加了返回健康的rabbitmq 健康检查。我还分享了服务 yaml 和主机信息。

我还可以查看从 rabbitmq 管理 ui 创建的身份交换

启动.cs

        var rabbitConfig = new RabbitMqConfig();
        rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
        rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
        rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
        rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");

        services.AddSingleton(rabbitConfig);

        var factory = new ConnectionFactory()
        {
            HostName = rabbitConfig.Host,
            Password = rabbitConfig.Password,
            UserName = rabbitConfig.UserName,
            VirtualHost = "/",
            Port = rabbitConfig.Port,
            AutomaticRecoveryEnabled = true
        };

        var connection = factory.CreateConnection();
        services.AddSingleton<IConnection>(connection);

        string connectionString =  Configuration.GetValue<string>("CONNECTION_STRING");

        services.AddDbContext<DataContext>(options =>
        {
            options.UseMySql(connectionString);
        }); 

        services.AddHealthChecks()
            .AddMySql(connectionString, "MySQL")
            .AddRabbitMQ(name: "Rabbit");

        services.AddMassTransit( x=> {   

            x.AddConsumers(typeof(Startup).Assembly);

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
                Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
                {
                    h.Username(rabbitConfig.UserName);
                    h.Password(rabbitConfig.Password);
                };
                cfg.Host(new Uri($"rabbitmq://{rabbitConfig.Host}"),"/", configure);
                cfg.ReceiveEndpoint("identity", e=> {
                    e.UseMessageRetry(x => x.Interval(2, 100));                    
                });
            }));
        }); 

        services.AddSingleton<IHostedService, BusService>(); 

服务.yaml

kind: Service
apiVersion: v1
metadata:
  namespace:  rabbitmq
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  ports:
   - name: rabbitmq-mgmt-port
     protocol: TCP
     port: 15672
     targetPort: 15672
   - name: rabbitmq-amqp-port
     protocol: TCP
     port: 5672
     targetPort: 5672

主持人:

rabbitmq.rabbitmq.svc.cluster.local

当我将我的应用程序部署到 k8s 集群时,MassTransit 写入如下错误。

MassTransit[0] 操作中断:RabbitMQ.Client.Exceptions.OperationInterruptedException:AMQP 操作被中断:AMQP 关闭原因,由 Peer 发起,代码 = 541,文本 ='INTERNAL_ERROR',类 ID = 0,方法 ID = 0 在 RabbitMQ。在 MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext 的 RabbitMQ.Client.Impl.ModelBase.QueueDeclare 的 Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan 超时)(字符串队列,布尔被动,布尔持久,布尔独占,布尔自动删除,IDictionary 2 arguments) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary2 参数)。 <>c__DisplayClass19_0.b__0() at System.Threading.Tasks.Task 1.InnerInvoke() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.ConfigureTopology(ModelContext context) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter 1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func2 setupMethod, PayloadFactory1 payloadFactory) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.GreenPipes.IFilter.Send(ModelContext context, IPipe 1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send(ConnectionContext context, IPipe 1 next) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send(IPipe 1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send (IPipe 1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send(IPipe`1 管道,CancellationToken cancelToken)在 MassTransit.RabbitMqTransport.Transport.RabbitMqReceiveTransport.b__12_0()

标签: asp.net-corekubernetesrabbitmqmasstransit

解决方案


推荐阅读