首页 > 解决方案 > Spark 按 Key 分组并对数据进行分区

问题描述

我有一个大型 csv 文件,其中包含以下格式的数据。

cityId1,名称,地址,.......,zip

cityId2,姓名,地址,.......,zip

cityId1,名称,地址,.......,zip

...........

cityIdN,名称,地址,.......,zip

我正在对上述 csv 文件执行以下操作:

  1. 按 cityId 作为键和资源列表作为值分组

    df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")

  2. 将其更改为 jsonRDD

    val jsonDataRdd2 = df2.toJSON.rdd

  3. 遍历每个分区并按密钥上传到 s3

我的问题:

标签: apache-sparkapache-spark-sql

解决方案


在回答您的问题时:

  • 从二级存储(S3、HDFS)读取时,分区等于文件系统的块大小,128MB 或 256MB;但您可以立即重新分区 RDD,而不是数据帧。(对于 JDBC 和 Spark 结构化流,分区的大小是动态的。)

  • 当应用“宽转换”并重新分区时,分区的数量和大小很可能会发生变化。给定分区的大小具有最大值。在 Spark 2.4.x 中,分区大小增加到 8GB。因此,如果任何转换(例如 collect_list 与 groupBy 组合)生成超过此最大大小,您将收到错误并且程序中止。因此,您需要明智地进行分区,或者在您的情况下有足够数量的分区进行聚合 - 请参阅 spark.sql.shuffle.partitions 参数。

  • Spark 处理的并行模型依赖于通过散列、范围分区等分配的“键”分配到一个且只有一个分区 - 洗牌。所以,遍历一个分区foreachPartition,mapPartitions没有问题。


推荐阅读