首页 > 解决方案 > 通过 golang 将 avro 格式的消息推送到 kafka

问题描述

我试图通过 confluent go 客户端向 kafka 推送一些消息,但问题是消息需要以 avro 格式推送。在 java springboot 应用程序中也可以轻松实现。

我有一种预感,好像这一切都可以通过 confluent go 客户端实现。虽然我有另一种方法可以通过融合的休息代理推送这些消息,但这意味着 3-4 倍的性能损失,我会拒绝这样做。

我尝试用 goAvro 转换 avro 中的消息。虽然我在生产时没有收到任何错误,但数据部分没有以 avro 格式存储。 在此处输入图像描述

avroCodec, err := goavro.NewCodec(schemaString)

if err != nil {
    log.Panic(err.Error())
}

appointmentByte,_ := json.Marshal(appointment)

native, _, _ := avroCodec.NativeFromTextual(appointmentByte)

binaryValue, _ := avroCodec.BinaryFromNative(nil,  native)

var recordValue []byte

schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(id))

recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, binaryValue...)

log.Print(recordValue)

key, _ := uuid.NewUUID()

fmt.Print(key.String())
p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{
        Topic: &topic, Partition: kafka.PartitionAny},
    Key: []byte(key.String()), Value: recordValue}, nil)

标签: goapache-kafkaavroconfluent-schema-registry

解决方案


你可以在 Github 上搜索你的问题的解决方案。它目前不是项目的一部分,但正在开发中

https://github.com/confluentinc/confluent-kafka-go/issues/69


推荐阅读