首页 > 解决方案 > Debezium + Schema Registry Avro Schema:为什么我有“之前”和“之后”字段,以及如何将其与 HudiDeltaStreamer 一起使用?

问题描述

我在 PostgreSQL 中有一个具有以下模式的表:

                                                       Table "public.kc_ds"
 Column |         Type          | Collation | Nullable |              Default              | Storage  | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
 id     | integer               |           | not null | nextval('kc_ds_id_seq'::regclass) | plain    |              |
 num    | integer               |           | not null |                                   | plain    |              |
 text   | character varying(50) |           | not null |                                   | extended |              |
Indexes:
    "kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"

当我为这个使用io.confluent.connect.avro.AvroConverterSchema Registry 的表运行 Debezium 源连接器时,它会创建一个看起来像这样的 Schema Registry 模式(这里省略了一些字段):

"fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"num",
                     "type":"int"
                  },
                  {
                     "name":"text",
                     "type":"string"
                  }
               ],
               "connect.name":"xxx.public.kc_ds.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
]

我的 Kafka 主题中由 Debezium 生成的消息如下所示(省略了某些字段):

{
  "before": null,
  "after": {
    "xxx.public.kc_ds.Value": {
      "id": 2,
      "num": 2,
      "text": "text version 1"
    }
}

当我插入或更新时,"before"总是null,并且"after"包含我的数据;当我删除时,反之亦然:"after"为空并"before"包含数据(尽管所有字段都设置为默认值)。

问题 #1:为什么 Kafka Connect 使用"before""after"字段创建模式?为什么这些领域的行为如此奇怪?

问题 #2:是否有一种内置方法可以让 Kafka Connect在仍然使用 Schema Registry 的同时向我的主题发送平面消息?请注意,Flatten变换不是我需要的:如果启用,我仍将拥有"before"and"after"字段。

问题 #3(实际上并不希望得到任何东西,但也许有人知道):扁平化我的消息的必要性来自于我需要使用HudiDeltaStreamer从我的主题中读取数据,而且这个工具似乎需要扁平化的输入数据。和字段最终在生成的 .parquet 文件中成为单独的类似对象"before"。有谁知道 HudiDeltaStreamer 应该如何与 Kafka Connect 生成的消息集成?"after"

标签: apache-kafkaapache-kafka-connectconfluent-schema-registrydebeziumapache-hudi

解决方案


推荐阅读