首页 > 解决方案 > 使用 (Py)Spark 从 Postgres 并行读取和处理

问题描述

我有一个关于从 Postgres 数据库中读取大量数据并使用 spark 并行处理它的问题。假设我在 Postgres 中有一个表,我想使用 JDBC 读入 Spark。假设它具有以下列:

目前 Postgres 表没有分区。我想并行转换大量数据,并最终将转换后的数据存储在其他地方。

问题:我们如何优化从 Postgres 中并行读取数据?

文档(https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)建议使用 apartitionColum来并行处理查询。此外,需要设置lowerBoundupperBound。据我了解,就我而言,我可以使用该列iddatefor partitionColumn。但是,这里的问题是如何在对其中一列进行分区时设置lowerBound和值。upperBound我注意到如果设置不当,我的情况会出现数据偏差。对于 Spark 中的处理,我不关心自然分区。我只需要尽可能快地转换所有数据,因此我认为优化未倾斜分区是首选。

我已经为此提出了一个解决方案,但我不确定这样做是否真的有意义。本质上,它是将 id 散列到分区中。我的解决方案是在具有指定数量的分区mod()的列上使用。id那么 中的dbtable字段将类似于:

"(SELECT *, mod(id, <<num-parallel-queries>>) as part FROM <<schema>>.<<table>>) as t"

然后我使用partitionColum="part"lowerBound=0upperBound=<<num-parallel-queries>>作为 Spark 读取 JDBC 作业的选项。

请让我知道这是否有意义!

标签: pythonpostgresqlapache-sparkjdbc

解决方案


按主键列“分区”是个好主意。

要获得大小相等的分区,请使用表统计信息:

SELECT histogram_bounds::text::bigint[]
FROM pg_stats
WHERE tablename = 'mytable'
  AND attname = 'id';

如果您有default_statistics_target默认值 100,这将是一个包含 101 个值的数组,将百分位数从 0 到 100 分隔开来。您可以使用它来对表进行均匀分区。

例如:如果数组看起来像这样{42,10001,23066,35723,49756,...,999960},并且您需要 50 个分区,则第一个分区是id< 23066 的所有行,第二个分区是 23066 ≤ id< 49756 的所有行,依此类推。


推荐阅读