apache-kafka - 通过 Kafka REST 将 AVRO 消息发布到 Kafka
问题描述
我们已经在 Kubernetes 集群中部署了 Confluent Platform 6.0。我通过 Kafka REST api 创建了一个 Kafka 主题“test-topic-1”。现在我正在尝试向该主题发布一条简单的 AVRO 消息。
curl --location --request POST 'https://kafka-rest-master.k8s.hip.com.au/topics/test-topic-1' \
--header 'Content-Type: application/vnd.kafka.avro.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{"value_schema":{"type":"record","name":"User","fields":[{"name":"name","type":"string"}]},"records":[{"value":{"name":"testUser"}}]}'
我收到此请求的 500 错误响应,
{"error_code":500,"message":"Internal Server Error"}
当我检查 kafka rest pod 的日志时,我可以看到以下错误,
错误请求失败并出现异常(io.confluent.rest.exceptions.DebuggableExceptionMapper)com.fasterxml.jackson.databind.exc.MismatchedInputException:无法反序列化
java.lang.String
[来源:(org.glassfish.jersey.message.internal .ReaderInterceptorExecutor$UnCloseableInputStream); 行:1,列:17](通过参考链:io.confluent.kafkarest.entities.v2.SchemaTopicProduceRequest["value_schema"])在 com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java: 59) 在 com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1445)
我是否按照正确的步骤将 AVRO 消息发布到新创建的 Kafka 主题?如果是这样,这可能是什么问题?
解决方案
您的 AVRO 模式定义是错误的。应该定义如下
{
"type": "record",
"name": "recordName",
"namespace": "namespace",
"doc": "description",
"fields": [
{
"name": "key",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
或者
"fields": [
{
"name": "fiedName",
"type": "string"
},
]
推荐阅读
- python - 如果python“逐行”运行文件,它如何在定义之前使用函数?
- leaflet - 如何在 Leaflet 的 TileLayer 上定义起始网格位置?
- reactjs - Cypress Test runner --- 我可以让“下载 React DevTools 以获得更好的开发体验”的警告消息消失吗?
- python - 使用 Selenium 抓取 javascript 网站,其中页面随机无法跨多个浏览器加载
- azure-service-fabric - 带有远程侦听器的 Azure Service Fabric ASP .net
- java - 列表
- 到地图
>、排序键、排序值和进程 - 到地图
- blob - 无法读取 Blob 存储帐户目录和子目录
- r - 如何在 RPostgreSQL 中使用参数化 IN
- mysql - MySQL 错误:1005 无法创建表(错误号:150)
- python-3.x - Pandas 日期时间问题:如何将缺少的周末插入到 python 数据框中的现有日期列中