python - Distribute dataset evenly by range of id in PySpark
问题描述
I'm very new to PySpark and been having a challenge with partitioning data.
I have 2 datasets:
- Ad data set (very big) with ad_id and some attribute columns
- Ad transactions data set (smaller), includes ad_id and transaction date
It appears to me that i can only partition by ad_id, my question is: how can i evenly distribute data by the ranges of ad_id for both data set, so that when i need to compute a join between the 2 sets, it'll be faster?
here is what i'm trying to do:
ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))
Thanks!
解决方案
使用分桶
如果您使用的是 spark v2.3 及更高版本,则可以使用分桶来避免写入后在连接上发生的洗牌。
通过分桶,您可以根据一列(通常是您要加入的列)将数据放入桶中。然后当 spark 再次从存储桶中读取数据时,您将不需要执行交换。
1. 样本数据
交易(事实)
t1.sample(n=5)
ad_id impressions
30 528749
1 552233
30 24298
30 311914
60 41661
名称(维度)
t2.sample(n=5)
ad_id brand_name
1 McDonalds
30 McDonalds
30 Coca-Cola
1 Coca-Cola
30 Levis
2.禁用广播加入
由于一张桌子很大而另一张桌子很小,因此您需要禁用broadcastJoin
.
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
3. 不分桶
t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)
t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')
unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")
unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()
+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
+- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
:- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336] <-- 0_0
: +- Project [ad_id#1842L, impressions#1843L]
: +- Filter isnotnull(ad_id#1842L)
: +- FileScan parquet default.unbucketed_transactions
+- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337] <-- 0_0
+- Project [ad_id#1846L, brand_name#1847]
+- Filter isnotnull(ad_id#1846L)
+- FileScan parquet default.unbucketed_brands
正如您所看到的,由于未存储的连接而发生了交换。
4. 带桶
# The number 30 tells spark how large the buckets should be.
# The second parameter is what column the bucket should be based on.
unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')
unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')
transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")
transactions.join(brands, 'ad_id').explain()
+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
+- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
:- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
: +- Project [ad_id#1867L, impressions#1868L]
: +- Filter isnotnull(ad_id#1867L)
: +- FileScan parquet default.bucketed_transactions
+- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
+- Project [ad_id#1871L, brand_name#1872]
+- Filter isnotnull(ad_id#1871L)
+- FileScan parquet default.bucketed_brands
从上面的计划可以看出,没有更多的交换发生。因此,您将通过避免交换来提高您的绩效。
推荐阅读
- r - 查找具有最大值的列序列
- reactjs - 检查是否在设备 React Native EXPO 上禁用声音
- ios - Xcode v11.1 和 11.2 - 构建正常但存档失败 - GRPC
- jquery - 在 rails 中的页面加载时使用多个 true 选项填充 select2 字段
- c# - Xamarin Web 容器避免后退按钮关闭应用程序
- mysql - Grouping in my SQL for a constraint less than 10
- java - 为什么在java中克隆对象时不调用构造函数
- python - Python循环未完成
- javascript - 在 div 数组中更改壁橱 div 的文本
- apache-kafka-streams - 根据部分数据属性更新KTable