首页 > 解决方案 > 如何将没有架构的数据发送到 kafka - 融合 jdbc - 接收器使用情况?

问题描述

我使用 conluent jdbc-sink 将我的数据从 kafka 加载到 oracle。

但是我用数据将我的模式写在值上。

我不想用数据编写模式,如何在 kafka 主题上编写模式,然后我只想从我的客户端发送数据?

提前致谢

json数据

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "field": 'ID',
                "type": "int32",
                "optional": False
            },
            {
                "field": 'PRODUCT',
                "type": "string",
                "optional": True
            },
            {
                "field": 'QUANTITY',
                "type": "int32",
                "optional": True
            },
            {
                "field": 'PRICE',
                "type": "int32",
                "optional": True
            }
        ],
        "optional": True,
        "name": "myrecord"
    },
    "payload": {
        "ID": 1071,
        "PRODUCT": 'ersin',
        "QUANTITIY": 1071,
        "PRICE": 1453
   }

蟒蛇代码:

producer.send(topic, key=b'1071'
              , value=json.dumps(v, default=json_util.default).encode('utf-8'))

我该如何解决这个问题?

提前致谢

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


如果要使用 JDBC 接收器连接器,则必须提供模式。这可以通过三种方式实现:

  • 使用启用了模式的 JSON
  • 使用 Avro 和模式注册表
  • 将 JSON 模式与模式注册表一起使用

您当前正在使用启用了模式的 JSON,这需要将模式与实际有效负载一起发送。实现您的要求的唯一方法是使用 Avro 和 Confluent Schema Registry,以便您的模式在模式注册表中注册。这样,您就不需要每次都发送有效负载模式。

另一种选择是将 JSON 与模式注册表 ( #1289 ) 一起使用。对于 Kafka Connect,您可以使用JsonSchemaConverter,对于 Java 消费者和生产者,您可以使用KafkaJsonSchemaSerializerKafkaJsonSchemaDeserializer.


推荐阅读