首页 > 解决方案 > golang rabbitmq channel.consume SIGSEGV

问题描述

伙计们,我正在尝试为 rabbitmq 实现编写一个包装库。出于某种奇怪的原因,我无法从现有队列中消费。

msgs, err := w.AMQP.Channel.Consume(导致:

2019-10-14T13:58:56.462-0400    info    worker/worker.go:27     [Initializing NewWorker]
2019-10-14T13:58:56.462-0400    debug   worker/worker.go:43     Starting Worker
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x28 pc=0x14d024e]

我遵循的文档示例很简单,rabbitmq 驱动程序的接口也很简单。我不明白为什么我会invalid memory address出错。

我的实现与文章的布局相同,供参考,这是我拥有的所有代码:

// StartConsumingTlcFHVDrivers subscribes to queue and starts to do it's job
func (w *Worker) StartConsuming() {
    queueName := w.options.Rabbit.AMQP.QueueName
    w.logger.Debugf("Starting %s Worker", queueName)

    msgs, err := w.AMQP.Channel.Consume(
        queueName,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        logger.Fatalf("Could not register consumer, err:%s", err)
    }

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            w.logger.Infof("Received a message: %s", d.Body)
        }
    }()

    w.logger.Infof("Waiting for messages on %s", queueName)
    <-forever
}

在我用自己的界面完成包装之前,我以这种方式设置客户端:

import ""github.com/streadway/amqp""

type Client struct {
    options Options
    logger  ilogger.Logger
    metrics imetrics.Client
    Channel amqp.Channel
}

func NewRabbitClient(logger ilogger.Logger, metrics imetrics.Client, options Options) *Client {
    var err error
    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }
    defer conn.Close()

    // create a dedicated channel per queue
    channel, err := conn.Channel()
    if err != nil {
        logger.Fatalf("%s: %s", "Can't create a amqpChannel", err)
    }
    defer channel.Close()

    // declare an Exchange
    err = channel.ExchangeDeclare(options.ExchangeName, options.ExchangeType, true, false, false, false, nil)

    // declare a Queue
    _, err = channel.QueueDeclare(options.QueueName, true, false, false, false, nil)
    if err != nil {
        logger.Fatalf("Could not declare %s queue, err:%s", options.QueueName, err)
    }

    // bind a Queue to the Exchange
    err = channel.QueueBind(options.QueueName, "", options.ExchangeName, false, nil)

    // Qos on the Channel
    err = channel.Qos(1, 0, false)
    if err != nil {
        logger.Fatalf("Could not configure QoS, err:%s", err)
    }

    return &Client{
        metrics: metrics,
        logger:  logger,
        options: options,
        Channel: *channel,
    }
}

标签: gorabbitmq

解决方案


这个错误现在对我来说很明显。在构造函数中,不要关闭连接/通道。IE

    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }

代替

    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }
    defer conn.Close()
    ```

推荐阅读