首页 > 解决方案 > Librdkafka Go 消费者和 websocket 问题

问题描述

我正在尝试编写一些使用 librdkafka Kafka Consumer 的简单 Go 代码,使用 Kafka 主题上的消息并将该消息发布到 HTTP(升级到 websocket)端点。类似于以下内容

Kafka Topic(myTopic) --> Go --> 客户端使用 HTTP 但升级到 Websockets 并保持开放连接并通过 Go 代码使用来自 Kafka 主题的所有消息。这是我到目前为止所尝试的。虽然我能够使用有关该主题的消息以及单独测试我的 websockets 代码,但我无法将两者拼接在一起。我无法弄清楚如何将 Kafka 消息传递给升级的 websocket 连接。

在这个阶段,我将所有代码保存在一个文件中,这违反了最佳实践。一旦我得到它的工作,我将重构代码。


import (
    "fmt"
    "log"
    "net/http"

    "github.com/gorilla/websocket"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

func reader(conn *websocket.Conn) {
    messageType, p, err := conn.ReadMessage()

    if err != nil {
        log.Println(err)
        return
    }

    log.Println(string(p))

    if err := conn.WriteMessage(messageType, p); err != nil {
        log.Println(err)
        return
    }

}

//function that will be triggered with the / handler
func homePage(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Home Page")
}

//function that will be triggered with the /ws handler
func wsEndpoint(w http.ResponseWriter, r *http.Request) {
    upgrader.CheckOrigin = func(r *http.Request) bool { return true }

    ws, err := upgrader.Upgrade(w, r, nil)

    if err != nil {
        log.Println(err)
    }

    log.Println("Client Succesfully connected")
    reader(ws)

}

//the handlers are initialized when the code is run.
func setupRoutes() {
    http.HandleFunc("/", homePage)
    http.HandleFunc("/ws", wsEndpoint)
}

func main() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    //Subscribe to "myTopic". Any message produced on myTopic will be consumed by this KafkaConsumer
    c.SubscribeTopics([]string{"myTopic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))

        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    c.Close()

    setupRoutes()
    log.Fatal(http.ListenAndServe(":8081", nil))
}

标签: golibrdkafka

解决方案


推荐阅读