首页 > 解决方案 > MAP scalar function issue in Ksqldb

问题描述

I have a custom UDF function (check_error) that takes Map<String, String> as input. In KSQL stream, I am creating a MAP and giving input to the UDF.

CREATE STREAM CHECK (VAL1 STRING, VAL2 DECIMAL(3,0)) WITH (KAFKA_TOPIC='CHECK', VALUE_FORMAT='AVRO');
select MAP('VAL1' := VAL1, 'VAL2' := CAST(VAL2 as STRING)) from CHECK emit changes;

Below is the insert into statements that I used for checking purposes. When any one of the value out of VAL1 and VAL2 is NULL, the whole output from MAP becomes null.

insert into check (VAL1, VAL2) VALUES ('my name', 123);
insert into check (VAL2) VALUES (123);
insert into check (VAL1) VALUES ('my name');

Actual Output -

{VAL1=my name, VAL2=123}
null
null

Preferred Output -

{VAL1=my name, VAL2=123}
{VAL1=null, VAL2=123}
{VAL1=my name, VAL2=null}

Here null is not a string

Is this a feature of MAP function in KSQLDB that whenever one of the values in a key is null it will make the whole map as null? Is there a way to get the preferred output? I have checked and tried multiple things but not able to get the preferred output.

If I use the IFNULL function, then I am able to get the whole dictionary in the output but not the preferred output.

select map('VAL1' := IFNULL(val1, 'null'), 'VAL2' := CASE WHEN val2 IS NULL THEN 'null' WHEN val2 IS NOT NULL THEN CAST(val2 AS STRING) END) from check emit changes;

Output - 
{VAL1=my name, VAL2=123}
{VAL1=null, VAL2=123}
{VAL1=my name, VAL2=null}

NOTE: Here the null is a STRING

标签: apache-kafkaksqldb

解决方案


I got the desired output using AS_MAP() scalar function instead of the MAP datatype of KSQL like I was doing in the question.

New KSQL query used -

select AS_MAP(ARRAY['VAL1', 'VAL2'], ARRAY[VAL1, CAST(VAL2 as STRING)]) from CHECK emit changes;

ARRAY is used to convert into list as the AS_MAP function takes list in its input.


推荐阅读