apache-kafka - ksqlDB:可以使用 UDF 进行 SELECT,但无法使用相同的 SELECT 语句创建流
问题描述
我有一个主题包含值作为字符串需要解析,所以我创建了一个 UDF:
@UdfDescription(
name = "PARSE_USER",
description = "A test parsing UDF for User"
)
public class ParseUser {
public static final Schema USER_SCHEMA = SchemaBuilder.struct().optional()
.field("id", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
.build();
@Udf(
description = "Parse string into an User struct",
schema="STRUCT<id STRING, name STRING, age INT>"
)
public Struct parse(@UdfParameter String raw) {
Struct result = new Struct(USER_SCHEMA);
// Parsing logic
return result;
}
}
将 UDF 部署到 ksqlDB 实例后,我们创建如下流:
CREATE STREAM user_raw_stream (
raw STRING
) WITH (
VALUE_FORMAT='KAFKA',
KAFKA_TOPIC='user_raw_topic',
partitions=1
);
将一些事件提交到主题后,我可以SELECT
使用 UDF:
ksql> SELECT PARSE_USER(raw) as user FROM user_raw_stream EMIT CHANGES;
+----------------------------------------------+
|USER |
+----------------------------------------------+
|{id=T000003, name=Frona Ness, age=44} |
|{id=T000001, name=Sherie Shine, age=31} |
|{id=T000004, name=Wes Jameson, age=27} |
|{id=T000005, name=Jarvis Stern, age=39} |
|{id=T000002, name=Liv Denman, age=52} |
但是我想用同样的SELECT
语句来创建另一个流,我得到了一个错误:
ksql> CREATE STREAM user_struct_stream AS
> SELECT PARSE_USER(raw) as user FROM user_raw_stream EMIT CHANGES;
Value format does not support schema.
format: KAFKA
schema: Persistence{columns=[`USER` STRUCT<`ID` STRING, `NAME` STRING, `AGE` INTEGER>], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'
Caused by: The 'KAFKA' format does not support type 'STRUCT'
为什么我可以SELECT
但不能使用相同的SELECT
语句来创建另一个流?
解决方案
推荐阅读
- ios - 防止 SCNText 的轴心点移动到其左对齐角
- c++ - 为什么套接字中的 recv() 函数不返回任何内容?
- react-native - 如何使用 react-native 0.60 更改 ReactNative 项目的项目和应用名称
- html - Webm 视频在电影和电视应用程序中播放视频和音频,但只能在 Windows 媒体播放器和 html 中播放视频?
- scala - 如何增加嵌入式 kafka 的 RAM?
- c# - 如何从主控件绑定到子控件
- android - 如何在android中组合来自String对象的多个结果?
- php - PHP 注意:数组到字符串的转换;数据库工厂
- reactjs - 如何在 React Router Dom 的 useHistory 中发送参数?
- python-3.x - 如何从熊猫数据框中的列中删除不需要的文本