go - Librdkafka Go 消费者和 websocket 问题
问题描述
我正在尝试编写一些使用 librdkafka Kafka Consumer 的简单 Go 代码,使用 Kafka 主题上的消息并将该消息发布到 HTTP(升级到 websocket)端点。类似于以下内容
Kafka Topic(myTopic) --> Go --> 客户端使用 HTTP 但升级到 Websockets 并保持开放连接并通过 Go 代码使用来自 Kafka 主题的所有消息。这是我到目前为止所尝试的。虽然我能够使用有关该主题的消息以及单独测试我的 websockets 代码,但我无法将两者拼接在一起。我无法弄清楚如何将 Kafka 消息传递给升级的 websocket 连接。
在这个阶段,我将所有代码保存在一个文件中,这违反了最佳实践。一旦我得到它的工作,我将重构代码。
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/websocket"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func reader(conn *websocket.Conn) {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
log.Println(string(p))
if err := conn.WriteMessage(messageType, p); err != nil {
log.Println(err)
return
}
}
//function that will be triggered with the / handler
func homePage(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Home Page")
}
//function that will be triggered with the /ws handler
func wsEndpoint(w http.ResponseWriter, r *http.Request) {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
}
log.Println("Client Succesfully connected")
reader(ws)
}
//the handlers are initialized when the code is run.
func setupRoutes() {
http.HandleFunc("/", homePage)
http.HandleFunc("/ws", wsEndpoint)
}
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
//Subscribe to "myTopic". Any message produced on myTopic will be consumed by this KafkaConsumer
c.SubscribeTopics([]string{"myTopic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
setupRoutes()
log.Fatal(http.ListenAndServe(":8081", nil))
}
解决方案
推荐阅读
- java - Hibernate原生查询方法
- php - 如何让所有值与 mysqli_fetch_assoc 相呼应?
- angular - Angular - 未捕获的错误:AdminLTE 在 adminlte.min.js:14 需要 jQuery
- c++ - 使用没有标准库的预处理器连接 C++ 源文件包括
- python-3.x - 信息在 .csv 中重复代码超过五次
- javascript - 如何修复基本的 javascript 计算器不显示小数点
- swagger - Swagger PreAuthorize 不记名令牌
- ruby-on-rails - 使用 rails 条带化付款意图
- flask - Backbone.js 下划线模板渲染问题
- javascript - 文件中的字体在调试期间不加载