java - KSQL:UDF 不接受参数(STRING、STRING)
问题描述
我在尝试使用 UDF 设置 ETL 管道时遇到了 KSQL 问题。在 ETL 过程中的某个时刻,我需要将特定信息与数据中的描述字段 (VARCHAR) 隔离开来。一个虚构的上下文示例:
描述=“物种=dog.sex=male.color=blonde.age=10。” (真实数据格式相同)
我编写了一个简单的 UDF 来按需隔离任何信息。它看起来像这样:
package com.my.package;
/** IMPORTS **/
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
/** ClASS DEFINITION **/
@UdfDescription(name = "extract_from_description",
author = "Me",
version = "0.0.1",
description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='".)
public class Extract_From_Description {
@Udf(description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='.)
public String extract_from_description(final String description, final String request) {
return description.split(request)[1].split("\\.")[0];
}
}
我可以很好地上传和注册该功能,它在我运行时被正确列出和描述:
ksql> list functions;
ksql> describe function EXTRACT_FROM_DESCRIPTION;
我调用这样的函数来创建一个新流:
CREATE STREAM result AS
SELECT recordId,
OtherVariables,
EXTRACT_FROM_DESCRIPTION(description, 'species=') AS species
FROM parent_stream
EMIT CHANGES;
在那里我得到一个我无法理解的错误:
函数“extract_from_description”不接受参数(字符串、字符串)。有效的替代方案是:
显然 KSQL 无法正确解释函数的输入应该是什么(看起来它不需要输入?)我不知道为什么。我已经阅读了文档,看看我是否以一种奇怪的方式定义了我的函数,但找不到示例和我的函数之间的任何区别。我确实注意到应该有几种方法来定义函数采用的输入并尝试了所有方法,但结果总是相同的。
我使用 Maven 为这个函数创建了 jar 文件(JDK1.8.0_201)。谁能帮我弄清楚发生了什么?
TL;DR: My KSQL UDF 不接受类型 (String, String) 的输入,即使函数指定输入应该是 (String, String) 类型
解决方案
发现问题,在这里为可能遇到相同问题的任何人回答。您需要使用@UdfParameter 指定参数,如下所示:
import io.confluent.ksql.function.udf.UdfParameter; // add this to the list of imports
// add @UdfParameter(name) to each input variable
public String extract_from_description(@UdfParameter(value = "description") final String description, @UdfParameter(value = "request") final String request){
function body
}
推荐阅读
- javascript - 如何避免 Vuetify 覆盖默认 CSS
- java - 在计算列表奇数的平均值时遇到问题
- c# - 删除 UriBuilder 中路径的最左侧部分
- kubernetes - 我们可以使用 kubectl cp 查看传输进度吗?
- ruby-on-rails - 在 ruby 字符串属性中嵌入超链接?
- python - 如何在烧瓶应用程序的 sqlalchemy 中处理符号(*、+、~、...)
- ocaml - 如何在 OCaml 中使用正常的模运算
- c# - C# RSA 在 BouncyCastle 的帮助下使用给定的公钥加密文本
- python - 应用 groupby 和 size 后 Python pandas 访问 True 和 False 值
- sql - SQL Server 设置创建表时的默认值