node.js - 节点:将 Protobuf 消息发送到 Kafka 错误
问题描述
我正在尝试使用 HDFS kafka 连接器将 protobuf 消息从 kafka 发送到 HDFS。我的连接器配置如下所示
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1"
}
}
为了测试这一点,我试图在一个小节点应用程序中发送 protobuf 序列化消息。这是我的文件:
// data.proto
syntax = "proto3";
package awesomepackage;
message SearchRequest {
string query = 1;
int32 page = 2;
}
和我的节点应用
const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.8.0.1:9092']
})
const producer = kafka.producer()
const run = async () => {
await producer.connect()
protobuf.load('data.proto', async (err, root) => {
console.log("TESTING")
console.log(err)
let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
let payload = {query: "test", page: 2}
var errMsg = SearchRequest.verify(payload);
console.log(errMsg)
let msg = SearchRequest.create(payload)
var buffer = SearchRequest.encode(msg).finish();
console.log(buffer)
await producer.send({
topic: 'test-topic',
messages: [
{key: 'key1', value: buffer}
]
})
})
}
run()
但是,当我运行它时,会出现以下错误:
Failed to deserialize data for topic test-topic to Protobuf
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我该如何解决?我的猜测是我的 protobuf 模式没有在 kafka 模式注册表中注册,但我不确定。如果是这种情况,有没有办法将模式从节点发送到注册表?
解决方案
io.confluent.connect.protobuf.ProtobufConverter
需要 Schema Registry,而不是普通的序列化 Protobuf。换句话说,您在 Node 代码中缺少 Schema Registry 部分(或“包装的”Proto 消息的手动字节创建)
如果您不想使用 Schema Registry,您可以使用BlueApron Protobuf Converter,但似乎您正在使用一个,所以最好使用 Confluent 转换器
推荐阅读
- javascript - Testcafe LocalStorage 的问题
- javascript - 使用“输入”事件动态更改背景颜色
- c - 创建一个以升序存储项目或按该顺序打印的链表
- javascript - 如果第一个 JS 脚本没有产生匹配项,我如何运行第二个 JS 脚本?
- python - 我们如何在 Python 中将信号拟合到 sigmoid 函数中?
- javascript - 向所有 Firebase 云消息传递令牌 (Web) 发送通知
- streaming - Liquidsoap——读取元数据
- nativescript - Unable to open dataurl using utils.openUrl()
- python - 堆叠稀疏矩阵
- laravel - 使用 laravel 作业处理图像上传时无法解决的依赖关系