首页 > 解决方案 > ksqlDB:可以使用 UDF 进行 SELECT,但无法使用相同的 SELECT 语句创建流

问题描述

我有一个主题包含值作为字符串需要解析,所以我创建了一个 UDF:

@UdfDescription(
    name = "PARSE_USER",
    description = "A test parsing UDF for User"
)
public class ParseUser {
    public static final Schema USER_SCHEMA = SchemaBuilder.struct().optional()
        .field("id", Schema.OPTIONAL_STRING_SCHEMA)
        .field("name", Schema.OPTIONAL_STRING_SCHEMA)
        .field("age", Schema.OPTIONAL_INT32_SCHEMA)
        .build();

    @Udf(
        description = "Parse string into an User struct",
        schema="STRUCT<id STRING, name STRING, age INT>"
    )
    public Struct parse(@UdfParameter String raw) {
        Struct result = new Struct(USER_SCHEMA);

        // Parsing logic

        return result;
    }
}

将 UDF 部署到 ksqlDB 实例后,我们创建如下流:

CREATE STREAM user_raw_stream (
  raw STRING
) WITH (
  VALUE_FORMAT='KAFKA',
  KAFKA_TOPIC='user_raw_topic',
  partitions=1
);

将一些事件提交到主题后,我可以SELECT使用 UDF:

ksql> SELECT PARSE_USER(raw) as user FROM user_raw_stream EMIT CHANGES;

+----------------------------------------------+
|USER                                          |
+----------------------------------------------+
|{id=T000003, name=Frona Ness, age=44}         |
|{id=T000001, name=Sherie Shine, age=31}       |
|{id=T000004, name=Wes Jameson, age=27}        |
|{id=T000005, name=Jarvis Stern, age=39}       |
|{id=T000002, name=Liv Denman, age=52}         |

但是我想用同样的SELECT语句来创建另一个流,我得到了一个错误:

ksql> CREATE STREAM user_struct_stream AS
>  SELECT PARSE_USER(raw) as user FROM user_raw_stream EMIT CHANGES;

Value format does not support schema.
format: KAFKA
schema: Persistence{columns=[`USER` STRUCT<`ID` STRING, `NAME` STRING, `AGE` INTEGER>], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'
Caused by: The 'KAFKA' format does not support type 'STRUCT'

为什么我可以SELECT但不能使用相同的SELECT语句来创建另一个流?

标签: apache-kafkaksqldb

解决方案


推荐阅读