首页 > 解决方案 > 使用 KSQL 在 kafka 主题上创建流

问题描述

以下是我来自 kafka 主题的示例日志

2019-03-04T08:53:03.023Z "cd8cbe" 100.212.212.212 - - [20/Feb/2019:12:13:33 +0000] "GET http://dl-mysite.com/drm/PRIORITY1080/HINDI_MOVIES/somemovie.mp4/video/avc1/4/seg-1281.m4s HTTP/1.1" 200 325040 "-" "Dalvik/2.1.0 (Linux; U; Android 6.0; Le X509 Build/DHXOSOP5801911241S)" "256" "0.000"

我正在尝试使用 KSQL 在上述主题上创建流。下面是我用来创建流的脚本。运行以下流创建脚本后,它返回“流创建”消息,但是选择语句(select * from test_duplicate_stream;)不返回任何内容。

CREATE STREAM test_duplicate_stream (logArrivalTime varchar,edgeid varchar,ip varchar,col1_empty varchar,col2_empty varchar, eventdate varchar,url varchar,response_code int,response_length BIGINT,col3_empty varchar,user_agent varchar,request_length varchar, response_time varchar) WITH (kafka_topic='test_duplicate',VALUE_FORMAT='DELIMITED');

我相信“DELIMITED”不是在这里使用的正确值,因为我的字段不是逗号分隔而是空格分隔。为我的日志线创建流的正确方法是什么?

标签: apache-kafkaksqldb

解决方案


KSQL 目前支持:

  • JSON
  • 逗号分隔(分隔)
  • 阿夫罗

如果您的数据不是上述格式之一,那么您将无法在不先更改序列化的情况下处理它。

另请参阅有关在查询未返回数据时对 KSQL 进行故障排除的说明。


推荐阅读