首页 > 解决方案 > Sarama Kafka 库:如何对消费者组的 session.MarkMessage() 进行单元测试?

问题描述

我正在尝试从消费者组示例中调整代码github.com/Shopify/sarama,并且正在努力添加一个单元测试来测试方法session.MarkMessage()中的功能ConsumeClaimhttps://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup /main.go#L160)。

这是我改编的带有consume()函数的代码:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
)

var (
    addrs = []string{"localhost:9092"}
    topic = "my-topic"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    var wg sync.WaitGroup
    defer wg.Wait()

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx, &wg, consumer)
    defer close()

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
}

func consume(ctx context.Context, wg *sync.WaitGroup, consumer *Consumer) (close func()) {
    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumerGroup, err := sarama.NewConsumerGroup(addrs, "my-group", config)
    if err != nil {
        log.Fatalf("NewConsumerGroup: %v", err)
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil {
                log.Panicf("Consume: %v", err)
            }
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    close = func() {
        if err := consumerGroup.Close(); err != nil {
            log.Panicf("Close: %v", err)
        }
    }
    return
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready  chan bool
    handle func([]byte) error
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", message.Value, message.Timestamp, message.Topic)
        if consumer.handle != nil {
            if err := consumer.handle(message.Value); err != nil {
                return fmt.Errorf("handle message %s: %v", message.Value, err)
            }
        }
        session.MarkMessage(message, "")
    }
    return nil
}

这是我为它编写的几个单元测试:

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "testing"
    "time"

    "github.com/Shopify/sarama"
    "github.com/stretchr/testify/require"
    "gotest.tools/assert"
)

func TestConsume(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer, err := sarama.NewSyncProducer(addrs, config)
    require.NoError(t, err)

    partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder([]byte("foobar")),
    })
    require.NoError(t, err)
    t.Logf("Sent message to partition %d with offset %d", partition, offset)

    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx, &wg, consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    time.Sleep(1 * time.Second)

    cancel()
    wg.Wait()
    close()
}

func TestConsumeTwice(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer, err := sarama.NewSyncProducer(addrs, config)
    require.NoError(t, err)

    data1, data2 := "foobar1", "foobar2"

    for _, data := range []string{data1, data2} {
        partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder("foobar"),
            Value: sarama.StringEncoder(data),
        })
        require.NoError(t, err)
        t.Logf("Sent message to partition %d with offset %d", partition, offset)
    }

    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    messageReceived := make(chan []byte)
    consumer := &Consumer{
        ready: make(chan bool),
        handle: func(data []byte) error {
            messageReceived <- data
            fmt.Printf("Received message: %s\n", data)
            return nil
        },
    }

    close := consume(ctx, &wg, consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    for i := 0; i < 2; i++ {
        data := <-messageReceived
        switch i {
        case 0:
            assert.Equal(t, data1, string(data))
        case 1:
            assert.Equal(t, data2, string(data))
        }
    }

    cancel()
    wg.Wait()
    close()
}

可以在 Docker 容器中运行 Kafka 和 Zookeeper 后运行测试,例如johnnypark/kafka-zookeeper

docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1  -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper

我正在努力解决以下问题:如果我注释掉该行

        session.MarkMessage(message, "")

测试仍然通过。根据https://godoc.org/github.com/Shopify/sarama#ConsumerGroupSessionMarkMessage将消息标记为已使用,但我将如何在单元测试中对此进行测试?

标签: goapache-kafkasarama

解决方案


推荐阅读