首页 > 解决方案 > FlinkSQL:如何过滤掉 SQL 中格式错误的 JSON?

问题描述

CREATE TABLE user_log (
    a STRING,
    b STRING
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = '',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'format.fail-on-missing-field' = 'false'
);

正确的格式是{"a":1,"b":2},但是kafka发送了错误的数据:AABB,程序会停止。SQL中如何过滤掉格式错误的JSON?</p>

标签: apache-flinkflink-streamingflink-sql

解决方案


在 Flink 1.11(即将发布)中,这些格式选项已被添加(并且都默认为 false)。请参阅FLINK-17663

'json.fail-on-missing-field' = false,
'json.ignore-parse-errors' = false,

我不确定您在早期版本中要做什么。


推荐阅读