apache-kafka - 如何将没有架构的数据发送到 kafka - 融合 jdbc - 接收器使用情况?
问题描述
我使用 conluent jdbc-sink 将我的数据从 kafka 加载到 oracle。
但是我用数据将我的模式写在值上。
我不想用数据编写模式,如何在 kafka 主题上编写模式,然后我只想从我的客户端发送数据?
提前致谢
json数据
{
"schema": {
"type": "struct",
"fields": [
{
"field": 'ID',
"type": "int32",
"optional": False
},
{
"field": 'PRODUCT',
"type": "string",
"optional": True
},
{
"field": 'QUANTITY',
"type": "int32",
"optional": True
},
{
"field": 'PRICE',
"type": "int32",
"optional": True
}
],
"optional": True,
"name": "myrecord"
},
"payload": {
"ID": 1071,
"PRODUCT": 'ersin',
"QUANTITIY": 1071,
"PRICE": 1453
}
蟒蛇代码:
producer.send(topic, key=b'1071'
, value=json.dumps(v, default=json_util.default).encode('utf-8'))
我该如何解决这个问题?
提前致谢
解决方案
如果要使用 JDBC 接收器连接器,则必须提供模式。这可以通过三种方式实现:
- 使用启用了模式的 JSON
- 使用 Avro 和模式注册表
- 将 JSON 模式与模式注册表一起使用
您当前正在使用启用了模式的 JSON,这需要将模式与实际有效负载一起发送。实现您的要求的唯一方法是使用 Avro 和 Confluent Schema Registry,以便您的模式在模式注册表中注册。这样,您就不需要每次都发送有效负载模式。
另一种选择是将 JSON 与模式注册表 ( #1289 ) 一起使用。对于 Kafka Connect,您可以使用JsonSchemaConverter
,对于 Java 消费者和生产者,您可以使用KafkaJsonSchemaSerializer
和KafkaJsonSchemaDeserializer
.
推荐阅读
- express - 如何在 Nuxt JS 中使用 Express 服务器后端
- objective-c - 在 xcode10 中运行测试用例时面临的问题
- api - 如何从 Postman 的响应正文中设置环境变量,其中相关关键字是连字符的
- spring-webflux - 单核细胞增多症
- > 与 Flux 的区别
在 Spring webflux 中 - java - 无法使用gmail登录
- excel - VBA - 对特定单元格求和
- angular - ngx-admin 中的新登录页面,带有自己的身份验证服务
- visual-studio - 将 VSS 迁移到 TFS
- android - 如何在 Flutter 应用程序中的任何设备上的任何屏幕上找到任何元素的确切绝对位置
- javascript - 基于共同值将对象合并到多维数组