首页 > 解决方案 > Spark JDBC过滤边界外的记录

问题描述

我正在尝试优化一项日常工作,该工作将三个月的数据从 MySQL 表中提取到 HDFS 上的 parquet 中。他们目前mysqldump以一种非常有创意的方式使用,但有一个 spark/hdfs 生态系统,所以我想我会改用它。

背景

我定义了如何像这样读取数据库:

# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url", url) \
  .option("driver", driver) \
  .option("dbtable", "table1") \
  .option("user", username) \
  .option("password", password) \
  .option("partitionColumn","time_col") \
  .option("upperBound", end_time) \
  .option("lowerBound", start_time) \
  .option("numPartitions", partitions) \
  .load()

除了第一个和最后一个分区有数十亿条我什至不想要的记录之外,这真的非常有效;

为了过滤掉绝大多数的表,我dtable这样更新

.option("dtable", "(select * from table1 WHERE time_col >= {} and time_col < {}) as table2".format(start_time, end_time))

这种工作。当end_time-start_time很小时,工作运行得很好,但不能扩展到 3 个月。

这是因为每个分区的查询现在都包含一个派生表

EXPLAIN SELECT * FROM (SELECT * From table1 WHERE time_col >=1585780000 AND time_col < 1585866400 ) as table2 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| id | select_type | table      | type  | possible_keys | key      | key_len | ref  | rows     | Extra       |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
|  1 | PRIMARY     | <derived2> | ALL   | NULL          | NULL     | NULL    | NULL | 25048354 | Using where |
|  2 | DERIVED     | table1     | range | time_col      | time_col | 4       | NULL | 25048354 | Using where |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+

相比之下,这是我刚刚使用时生成的查询的样子dtable = "table1";更简单更快

explain SELECT * From table1 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| id | select_type | table  | type  | possible_keys | key      | key_len | ref  | rows    | Extra       |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
|  1 | SIMPLE      | table1 | range | time_col      | time_col | 4       | NULL | 1097631 | Using where |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+

问题

我有什么办法可以过滤掉外部的数据upperBoundlowerBound同时仍然保持更简单的查询?就像阻止第一个和最后一个分区运行或dtable在运行时覆盖一样,它只用table1?替换子查询

参数

我只有对 MySQL 5.7 上的表的读取权限,不能创建视图或索引

我正在 Spark 3.1 上开发,但我相信生产是在 Spark 2 上

是的,我考虑过 Spark 结构化流和其他流选项,但这不是我们目前的方向。

标签: mysqlapache-sparkhdfs

解决方案


我发现如果我添加 where() 方法,我可以避免子查询。例子:

# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url", url) \
  .option("driver", driver) \
  .option("dbtable", "table1") \
  .option("user", username) \
  .option("password", password) \
  .option("partitionColumn","time_col") \
  .option("upperBound", end_time) \
  .option("lowerBound", start_time) \
  .option("numPartitions", partitions) \
  .load()

# This filters out everything outside of boundaries
# without creating a subquery
df.where('time_col >= {} AND time_col < {}'.format(start_time,end_time))

Spark 能够将子句与分区逻辑创建的子句一起添加。因此,没有子查询和更好的性能。


推荐阅读