go - 通过 golang 将 avro 格式的消息推送到 kafka
问题描述
我试图通过 confluent go 客户端向 kafka 推送一些消息,但问题是消息需要以 avro 格式推送。在 java springboot 应用程序中也可以轻松实现。
我有一种预感,好像这一切都可以通过 confluent go 客户端实现。虽然我有另一种方法可以通过融合的休息代理推送这些消息,但这意味着 3-4 倍的性能损失,我会拒绝这样做。
我尝试用 goAvro 转换 avro 中的消息。虽然我在生产时没有收到任何错误,但数据部分没有以 avro 格式存储。
avroCodec, err := goavro.NewCodec(schemaString)
if err != nil {
log.Panic(err.Error())
}
appointmentByte,_ := json.Marshal(appointment)
native, _, _ := avroCodec.NativeFromTextual(appointmentByte)
binaryValue, _ := avroCodec.BinaryFromNative(nil, native)
var recordValue []byte
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(id))
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, binaryValue...)
log.Print(recordValue)
key, _ := uuid.NewUUID()
fmt.Print(key.String())
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key.String()), Value: recordValue}, nil)
解决方案
你可以在 Github 上搜索你的问题的解决方案。它目前不是项目的一部分,但正在开发中
https://github.com/confluentinc/confluent-kafka-go/issues/69
推荐阅读
- python - 用于字典转换的特殊方法名称的内置函数
- sql-server - SQL Server Profiler - 奇怪的持续时间,StartTime 大于 EndTime
- c# - 在 dotnet core 2.0 和 angular 2+ 中访问当前用户的下载文件夹
- javascript - 是否可以使用 vanilla js 在窗口范围内使用 clientX 和 clientY 触发单击按钮
- c# - FluentAssertions Should().BeEquivalentTo() 失败,列表包含从相同接口派生的运行时指定类型
- javascript - 无法在“节点”上执行“包含”:参数 1 不是“节点”类型
- c# - c# - itemsource 文本到超链接
- javascript - Webpack 以错误的顺序捆绑了 Bootstrap 相关的 Javascript 文件
- javascript - 从数据库向 GoogleMaps 添加标记会导致位置 1 的 JSON 中出现意外的标记 o
- ubuntu - 如何检查 ubuntu 包是否已签名