python - 使用 (Py)Spark 从 Postgres 并行读取和处理
问题描述
我有一个关于从 Postgres 数据库中读取大量数据并使用 spark 并行处理它的问题。假设我在 Postgres 中有一个表,我想使用 JDBC 读入 Spark。假设它具有以下列:
- id(大整数)
- 日期(日期时间)
- 许多其他列(不同类型)
目前 Postgres 表没有分区。我想并行转换大量数据,并最终将转换后的数据存储在其他地方。
问题:我们如何优化从 Postgres 中并行读取数据?
文档(https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)建议使用 apartitionColum
来并行处理查询。此外,需要设置lowerBound
和upperBound
。据我了解,就我而言,我可以使用该列id
和date
for partitionColumn
。但是,这里的问题是如何在对其中一列进行分区时设置lowerBound
和值。upperBound
我注意到如果设置不当,我的情况会出现数据偏差。对于 Spark 中的处理,我不关心自然分区。我只需要尽可能快地转换所有数据,因此我认为优化未倾斜分区是首选。
我已经为此提出了一个解决方案,但我不确定这样做是否真的有意义。本质上,它是将 id 散列到分区中。我的解决方案是在具有指定数量的分区mod()
的列上使用。id
那么 中的dbtable
字段将类似于:
"(SELECT *, mod(id, <<num-parallel-queries>>) as part FROM <<schema>>.<<table>>) as t"
然后我使用partitionColum="part"
、lowerBound=0
和upperBound=<<num-parallel-queries>>
作为 Spark 读取 JDBC 作业的选项。
请让我知道这是否有意义!
解决方案
按主键列“分区”是个好主意。
要获得大小相等的分区,请使用表统计信息:
SELECT histogram_bounds::text::bigint[]
FROM pg_stats
WHERE tablename = 'mytable'
AND attname = 'id';
如果您有default_statistics_target
默认值 100,这将是一个包含 101 个值的数组,将百分位数从 0 到 100 分隔开来。您可以使用它来对表进行均匀分区。
例如:如果数组看起来像这样{42,10001,23066,35723,49756,...,999960}
,并且您需要 50 个分区,则第一个分区是id
< 23066 的所有行,第二个分区是 23066 ≤ id
< 49756 的所有行,依此类推。
推荐阅读
- python - python在同一文件夹中导入文件
- javascript - Fabric.JS 与 Node.JS - 导出为 PNG/JPEG
- swift - 用于条码扫描的快速 Avcapture 会话不起作用
- google-colaboratory - 从介绍页面运行演示的 OpenAI Gym 错误
- amazon-web-services - AWS CloudFront 如何检测移动设备
- javascript - 如何在不使用 node.js 的情况下添加标准的反应应用程序
- tfs - 执行:执行(0,0):错误:未找到:python2
- apache-kafka - 使用用于 sqlite 的 jdbc 连接器使用 kafka 连接构建管道
- amazon-web-services - AWS S3 MV 致命错误:('读取操作超时',)
- docker - 如何在没有互联网访问的情况下使用掌舵图