sql - Flink SQL:源表太大而无法放入内存
问题描述
我对 Flink 比较陌生,今天在 Flink 1.11.3 会话集群上使用 Flink SQL 时遇到了一个问题。
问题
我注册了一个使用 jdbc postgres 驱动程序的源表。我正在尝试以parquet格式将一些数据从这个在线数据库移动到 AWS S3。这个表很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器内存不足。
我的观察
我发现当我做tableEnv.executeSql("select ... from huge_table limit 1000")
flink 尝试将整个源表扫描到内存中时,才计划做限制。
问题
由于我只关心最近几天的数据,有没有办法限制一个作业按时间戳扫描多少行?
附录
这是可以重现问题的最小设置(消除了很多噪音)
环境设置代码
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);
Flink SQL 中的源表 DDL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)
Flink SQL 中的 Sink 表 DDL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)
在 Flink SQL 中插入查询
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
解决方案
您的观察是正确的,Flink 不支持对 JDBC 连接器的限制下推优化,并且有一个几乎合并的 PR 来支持此功能,这将在 Flink 1.13 中使用,如果您是,您可以在您的代码中挑选这个补丁迫切需要这个功能。
1.JIRA:FLINK-19650 支持Jdbc的限制下推
推荐阅读
- python - 按钮没有给出正确的值
- android - 第一次点击时未调用 onOptionsItemSelected
- reactjs - 我可以依赖组件中的 useEffect 顺序吗?
- java - 无法使用 maven 构建项目 - 兼容性错误
- angular - Angular 应用程序到 CRM API - CORS 策略已阻止从源“localhost”访问“”处的 XMLHttpRequest
- mysql - 无法在 EKS kubernetes 中使用 EBS 卷运行 mysql
- xamarin.forms - Xamarin Forms Shell FlyoutItem Disabled VisualState 不起作用
- javascript - 如何使用Angular根据其他对象中的键过滤结果?
- mysql - 这个mysql语句有什么问题?
- java - 手动将数据库中的数据插入到具有 hibernate_sequence @GeneratedValue(strategy = GenerationType.TABLE) 的表中