首页 > 解决方案 > Flink SQL:源表太大而无法放入内存

问题描述

我对 Flink 比较陌生,今天在 Flink 1.11.3 会话集群上使用 Flink SQL 时遇到了一个问题。

问题

我注册了一个使用 jdbc postgres 驱动程序的源表。我正在尝试以parquet格式将一些数据从这个在线数据库移动到 AWS S3。这个表很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器内存不足。

我的观察

我发现当我做tableEnv.executeSql("select ... from huge_table limit 1000")flink 尝试将整个源表扫描到内存中时,才计划做限制。

问题

由于我只关心最近几天的数据,有没有办法限制一个作业按时间戳扫描多少行?

附录

这是可以重现问题的最小设置(消除了很多噪音)

环境设置代码

var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

Flink SQL 中的源表 DDL

CREATE TABLE source_transactions (
    txid STRING,
    username STRING,
    amount BIGINT,
    ts TIMESTAMP,
    PRIMARY KEY (txid) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:postgresql://my.bank',
    'table-name'='transactions',
    'driver'='org.postgresql.Driver',
    'username'='username',
    'password'='password',
    'scan.fetch-size'='2000'
)

Flink SQL 中的 Sink 表 DDL

CREATE TABLE sink_transactions (
    create_time TIMESTAMP,
    username STRING,
    delta_amount DOUBLE,
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector'='filesystem',
    'path'='s3a://s3/path/to/transactions',
    'format'='parquet'
)

在 Flink SQL 中插入查询

INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions

标签: sqlpostgresqlapache-flinkflink-sqlcockroachdb

解决方案


您的观察是正确的,Flink 不支持对 JDBC 连接器的限制下推优化,并且有一个几乎合并的 PR 来支持此功能,这将在 Flink 1.13 中使用,如果您是,您可以在您的代码中挑选这个补丁迫切需要这个功能。

1.JIRA:FLINK-19650 支持Jdbc的限制下推

2.公关:https ://github.com/apache/flink/pull/13800


推荐阅读