apache-kafka - KSQL 从带有句点的 JSON 字段创建流(`.` 点符号)
问题描述
我有一个像{"destination.port":"443","network.packets":"4464","event.end":"2019-07-19T07:47:22.000Z","source.address":"1.2.2.3","message":"OK","server.address":"ip-1-2-2-3.ec2.internal","event.action":"ACCEPT","event.module":"S3bucket","source.port":"56448","network.protocol":"6","cloud.account.id":"512889038796","event.type":"AWS_VPC_log","organization.id":"DeloitteFusion","destination.address":"1.2.2.3","network.bytes":"178584","event.start":"2019-07-19T07:46:22.000Z","event.kind":"2","host.id":"eni-0c5e3a6282a912997","timestamp":"2019-07-19T07:51:52.584Z","srckey_val":"167772160_184549375","srckey_rev":"15072019_1541"}
将 KSQL 流创建为create stream vpc_log ("destination.port" integer, "network.packets" integer, "event.end" varchar, "source.address" varchar, message varchar, "server.address" varchar, "event.action" varchar, "event.module" varchar, "source.port" integer, "network.protocol" integer, "cloud.account.id" bigint, "event.type" varchar, "organization.id" varchar, "destination.address" varchar, "network.bytes" integer, "event.start" varchar, "event.kind" integer, "host.id" varchar, timestamp varchar, srckey_val varchar, srckey_rev varchar) WITH (KAFKA_TOPIC='client_data_parsed', VALUE_FORMAT='JSON');
运行时select * from vpc_log;
抛出以下错误Caused by: Cannot create field because of field name duplication address
。
所以将流查询修改为create stream vpc_log ("destination.port2" integer, "network.packets" integer, "event.end" varchar, "source.addres" varchar, message varchar, "server.adress" varchar, "event.action" varchar, "event.module" varchar, "source.port1" integer, "network.protocol" integer, "cloud.account.id2" bigint, "event.type" varchar, "organization.id" varchar, "destination.adres" varchar, "network.bytes" integer, "event.start" varchar, "event.kind" integer, "host.id1" varchar, timestamp varchar, srckey_val varchar, srckey_rev varchar) WITH (KAFKA_TOPIC='client_data_parsed', VALUE_FORMAT='JSON');
select * 上的输出是1563522879847 | null | null | null | null | null | OK | null | null | null | null | null | null | null | null | null | null | null | null | null | 2019-07-19T07:51:52.584Z | 167772160_184549375 | 15072019_1541
所有 dotted(.) 字段/键的值为null
. 我指的是没有提供解决方案的链接。帮我理解。
尝试将点转义为:
create stream vpc_log ("DESTINATION\.PORT1" INTEGER, "NETWORK\.PACKETS" INTEGER, "EVENT\.END" VARCHAR, "SOURCE\.ADDRESS1" VARCHAR, MESSAGE VARCHAR, "SERVER\.ADDRESS2" VARCHAR, "EVENT\.ACTION" VARCHAR, "EVENT\.MODULE" VARCHAR, "SOURCE\.PORT2" INTEGER,"NETWORK\.PROTOCOL" INTEGER,"CLOUD\.ACCOUNT\.ID" BIGINT,"EVENT\.TYPE" VARCHAR,"ORGANIZATION\.ID1" VARCHAR,"DESTINATION\.ADDRESS3" VARCHAR,"NETWORK\.BYTES" INTEGER,"EVENT\.START" VARCHAR,"EVENT\.KIND" INTEGER,"HOST\.ID2" VARCHAR,TIMESTAMP VARCHAR,SRCKEY_VAL VARCHAR,SRCKEY_REV VARCHAR)WITH (KAFKA_TOPIC='client_data_parsed',VALUE_FORMAT='JSON');
还是同样的问题。
我可以使用键中没有点的查询作为快速解决方法。但是,想知道KSQL
有点是什么意思吗?
解决方案
我在 slack 社区中得到了 ksql 开发人员的回答。可能会帮助某人。KSQL 对此没有官方支持,但一种解决方法是逃避该时期。从这里安装 ksql v5.3.0 。
create stream vpc_log ("DESTINATION\.PORT" INTEGER,"NETWORK\.PACKETS" INTEGER,"EVENT\.END" VARCHAR,"SOURCE\.ADDRESS" VARCHAR,MESSAGE VARCHAR,"SERVER\.ADDRESS" VARCHAR,"EVENT\.ACTION" VARCHAR,"EVENT\.MODULE" VARCHAR,"SOURCE\.PORT" INTEGER,"NETWORK\.PROTOCOL" INTEGER,"CLOUD\.ACCOUNT\.ID" BIGINT,"EVENT\.TYPE" VARCHAR,"ORGANIZATION\.ID" VARCHAR,"DESTINATION\.ADDRESS" VARCHAR,"NETWORK\.BYTES" INTEGER,"EVENT\.START" VARCHAR,"EVENT\.KIND" INTEGER,"HOST\.ID" VARCHAR,TIMESTAMP VARCHAR,SRCKEY_VAL VARCHAR,SRCKEY_REV VARCHAR) WITH (KAFKA_TOPIC='client_data_parsed',
VALUE_FORMAT='JSON');
谢谢罗宾·莫法特。
推荐阅读
- r - t 检验权重且不等于方差
- laravel - 如何为用户获取博客下的图像
- powershell - PowerShell - 如何将以下脚本放在一起+调整页眉和页脚
- c++ - 是否可以在不使用 C++ 中的堆的情况下创建类 String?
- java - 找不到 XSOM 的权威源位置
- python - 我如何计算python字典中值的数量
- python - 如何使用漂亮的汤提取 HTML 中特定键的值
- c# - 如何管理具有多个连接的主键?
- security - 如何确定 Firefox 控制台就我的内容安全策略抱怨什么“内联脚本”?
- apache-kafka - 有没有办法使用 Kafka Connect 连接到多个主机中的多个数据库?