首页 > 解决方案 > PySpark 中 JDBC 上的自定义分区

问题描述

我想在 pyspark 中处理 oracle 数据库中有一个巨大的表。但是我想使用自定义查询对其进行分区,例如假设表中有一个包含用户名的列,并且我想根据用户名的第一个字母对数据进行分区。或者假设每条记录都有一个日期,我想根据月份对其进行分区。而且由于表很大,我绝对需要每个分区的数据直接由其执行程序而不是由主服务器获取。那么我可以在 pyspark 中做到这一点吗?

PS:我需要控制分区的原因是我需要在每个分区上执行一些聚合(分区有意义,而不仅仅是分发数据),所以我希望它们在同一台机器上以避免任何洗牌. 这可能吗?还是我错了?

笔记

我不在乎均匀倾斜的分区!我希望将所有相关记录(例如用户的所有记录,或来自城市的所有记录等)分区在一起,以便它们驻留在同一台机器上,并且我可以将它们聚合在一起而无需任何洗牌。

标签: apache-sparkpyspark

解决方案


事实证明,spark 有一种精确控制分区逻辑的方法。这就是predicates.spark.read.jdbc

我最终想出的如下:

(为了这个例子,假设我们有一个商店的购买记录,我们需要根据userId和对其进行分区,productId以便将一个实体的所有记录一起保存在同一台机器上,我们可以对这些实体无需改组)

  • 首先,生成要分区的每一列的直方图(每个值的计数):
用户身份 数数
123456 1640
789012 932
345678 1849
901234 11
... ...
产品编号 数数
123456789 5435
523485447 254
363478326 2343
326484642 905
... ...
  • 然后,使用multifit 算法将每列的值划分到n平衡的 bin 中(n 是您想要的分区数)。
用户身份
123456 1
789012 1
345678 1
901234 2
... ...
产品编号
123456789 1
523485447 2
363478326 2
326484642 3
... ...
  • 然后,将这些存储在数据库中

  • 然后更新您的查询并join在这些表上获取每条记录的 bin 编号:

url = 'jdbc:oracle:thin:username/password@address:port:dbname'

query = ```
(SELECT
  MY_TABLE.*, 
  USER_PARTITION.BIN as USER_BIN, 
  PRODUCT_PARTITION.BIN AS PRODUCT_BIN 
FROM MY_TABLE 
LEFT JOIN USER_PARTITION 
  ON my_table.USER_ID = USER_PARTITION.USER_ID 
LEFT JOIN PRODUCT_PARTITION 
  ON my_table.PRODUCT_ID = PRODUCT_PARTITION.PRODUCT_ID) MY_QUERY```

df = spark.read\
     .option('driver', 'oracle.jdbc.driver.OracleDriver')\
     jdbc(url=url, table=query, predicates=predicates)
  • And finally, generate the predicates. One for each partition, like these:
predicates = [
  'USER_BIN = 1 OR PRODUCT_BIN = 1',
  'USER_BIN = 2 OR PRODUCT_BIN = 2',
  'USER_BIN = 3 OR PRODUCT_BIN = 3',
  ...
  'USER_BIN = n OR PRODUCT_BIN = n',
]

The predicates are added to the query as WHERE clauses, which means that all the records of the users in partition 1 go to the same machine. Also, all the records of the products in partition 1 go to that same machine as well.

Note that there are no relations between the user and the product here. We don't care which products are in which partition or are sent to which machine. But since we want to perform some aggregations on both the users and the products (separately), we need to keep all the records of an entity (user or product) together. And using this method, we can achieve that without any shuffles.

Also, note that if there are some users or products whose records don't fit in the workers' memory, then you need to do a sub-partitioning. Meaning that you should first add a new random numeric column to your data (between 0 and some chunk_size like 10000 or something), then do the partitioning based on the combination of that number and the original IDs (like userId). This causes each entity to be split into fixed-sized chunks (i.e., 10000) to ensure it fits in the workers' memory. And after the aggregations, you need to group your data on the original IDs to aggregate all the chunks together and make each entity whole again.

The shuffle at the end is inevitable because of our memory restriction and the nature of our data, but this is the most efficient way you can achieve the desired results.


推荐阅读