首页 > 解决方案 > 如何将结构化记录直接从 KSQL 下沉到连接器(例如,InfluxDB)

问题描述

我正在尝试将数据直接从 KSQL 接收到 InfluxDB(或任何其他需要定义的连接器)。我能够让事情在简单的情况下工作,但是当模式需要复杂类型时我开始遇到麻烦。(即,InfuxDB 的标签)。

这是我的流/模式的示例:

 Field    | Type                                                   
-------------------------------------------------------------------
 ROWKEY   | VARCHAR(STRING)  (primary key)
 FIELD_1  | VARCHAR(STRING)                                        
 FIELD_2  | VARCHAR(STRING)                                        
 FIELD_3  | VARCHAR(STRING)                                        
 FIELD_4  | DOUBLE                                                 
 TAGS     | MAP<STRING, VARCHAR(STRING)> 

如果我手动创建 AVRO 模式并填充来自简单生产者的记录,我可以通过此处的入门指南并嵌入 InfluxDB 的标签。

但是,当我迁移到 KSQL 时,如果我尝试将 AVRO 流直接下沉到 InfluxDB 中,我会丢失有关复杂类型(标签)的信息。我注意到这篇文中的警告,“警告 ksqlDB/KSQL 尚不能以与此连接器兼容的 Avro 格式写入数据”

接下来,我尝试将 AVRO 流转换为 JSON 格式,但现在我明白我必须在每条记录中指定模式,类似于这个问题所提出的问题。我无法将 AVRO 流转换为 JSON 流并同时嵌入模式和有效负载。

最后,我看到了kafkacat 的“抖动解决方案”,但这将迫使我将记录从 KSQL 转储到 kafkacat,然后在最终到达 Influx 之前返回到 Kafka。

有没有一种方法可以将 JSON 或 AVRO 格式的复杂记录直接从 KSQL 接收到连接器中?

标签: apache-kafkaksqldbconfluent-platform

解决方案


我想 ksqlDB 还不能以 InfluxDB 需要的格式输出 AVRO 数据的原因是因为它不会将TAGS字段输出为 Avromap类型,因为 Avro 映射需要非空键并且 SQLMAP<STRING, STRING>类型允许空键。因此 ksqlDB 将映射序列化为array键值条目的 Avro。

要使用 Avro,您需要:

  1. 支持非空类型:https ://github.com/confluentinc/ksql/issues/4436 ,或
  2. 支持使用现有的 Avro 架构:https ://github.com/confluentinc/ksql/issues/3634

请随时对这些问题进行投票/评论以提高他们的个人资料。

以前,基于 JSON 的解决方案不起作用,因为正如您所指出的,连接器需要嵌入在有效负载中的 JSON 模式。但是,最新版本的 Confluent Platform / Schema Registry 支持 Schema Registry 中的 JSON 模式。因此,虽然我没有尝试过,但升级到最新的 CP 版本可能意味着基于 JSON 的解决方案将起作用。如果没有,可能需要提交 Jira/Github 票证以升级适当的组件以使其正常工作。


推荐阅读