apache-kafka - 如何将结构化记录直接从 KSQL 下沉到连接器(例如,InfluxDB)
问题描述
我正在尝试将数据直接从 KSQL 接收到 InfluxDB(或任何其他需要定义的连接器)。我能够让事情在简单的情况下工作,但是当模式需要复杂类型时我开始遇到麻烦。(即,InfuxDB 的标签)。
这是我的流/模式的示例:
Field | Type
-------------------------------------------------------------------
ROWKEY | VARCHAR(STRING) (primary key)
FIELD_1 | VARCHAR(STRING)
FIELD_2 | VARCHAR(STRING)
FIELD_3 | VARCHAR(STRING)
FIELD_4 | DOUBLE
TAGS | MAP<STRING, VARCHAR(STRING)>
如果我手动创建 AVRO 模式并填充来自简单生产者的记录,我可以通过此处的入门指南并嵌入 InfluxDB 的标签。
但是,当我迁移到 KSQL 时,如果我尝试将 AVRO 流直接下沉到 InfluxDB 中,我会丢失有关复杂类型(标签)的信息。我注意到这篇博文中的警告,“警告 ksqlDB/KSQL 尚不能以与此连接器兼容的 Avro 格式写入数据”
接下来,我尝试将 AVRO 流转换为 JSON 格式,但现在我明白我必须在每条记录中指定模式,类似于这个问题所提出的问题。我无法将 AVRO 流转换为 JSON 流并同时嵌入模式和有效负载。
最后,我看到了kafkacat 的“抖动解决方案”,但这将迫使我将记录从 KSQL 转储到 kafkacat,然后在最终到达 Influx 之前返回到 Kafka。
有没有一种方法可以将 JSON 或 AVRO 格式的复杂记录直接从 KSQL 接收到连接器中?
解决方案
我想 ksqlDB 还不能以 InfluxDB 需要的格式输出 AVRO 数据的原因是因为它不会将TAGS
字段输出为 Avromap
类型,因为 Avro 映射需要非空键并且 SQLMAP<STRING, STRING>
类型允许空键。因此 ksqlDB 将映射序列化为array
键值条目的 Avro。
要使用 Avro,您需要:
- 支持非空类型:https ://github.com/confluentinc/ksql/issues/4436 ,或
- 支持使用现有的 Avro 架构:https ://github.com/confluentinc/ksql/issues/3634
请随时对这些问题进行投票/评论以提高他们的个人资料。
以前,基于 JSON 的解决方案不起作用,因为正如您所指出的,连接器需要嵌入在有效负载中的 JSON 模式。但是,最新版本的 Confluent Platform / Schema Registry 支持 Schema Registry 中的 JSON 模式。因此,虽然我没有尝试过,但升级到最新的 CP 版本可能意味着基于 JSON 的解决方案将起作用。如果没有,可能需要提交 Jira/Github 票证以升级适当的组件以使其正常工作。
推荐阅读
- sql - 过去 6 个月的用户数图 - Laravel
- google-cloud-platform - 从谷歌云获取访问令牌
- java - Spring boot 错误“TrainerController 中的字段存储库需要一个无法找到的 TrainerRepo 类型的 bean。” 。”
- vue.js - Vue中最好的博客管理系统框架
- php - 如何使用 eloquent 成功地将外键约束添加到 laravel 中的数据库表?
- flutter - 了解 ScopedModel 和 Flutter 中的状态
- reactjs - 将 SVG div 加载到 React 组件
- reactjs - 如何将reducer的返回值设置为react js组件中的状态对象
- amazon-web-services - 当 2 个 lambda 具有相同的运动流时,会有什么行为?
- python - 使用 Paramiko 从远程服务器执行命令到另一个远程服务器