apache-spark - Spark 按 Key 分组并对数据进行分区
问题描述
我有一个大型 csv 文件,其中包含以下格式的数据。
cityId1,名称,地址,.......,zip
cityId2,姓名,地址,.......,zip
cityId1,名称,地址,.......,zip
...........
cityIdN,名称,地址,.......,zip
我正在对上述 csv 文件执行以下操作:
按 cityId 作为键和资源列表作为值分组
df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")
将其更改为 jsonRDD
val jsonDataRdd2 = df2.toJSON.rdd
遍历每个分区并按密钥上传到 s3
- 由于业务逻辑限制(其他服务如何从 S3 读取),我无法使用 dataframe partitionby write
我的问题:
- spark分区的默认大小是多少?
- 假设分区的默认大小是 X MB,并且 dataFrame 中存在一个大记录,其键具有 Y MB 的数据 (Y > X),在这种情况下会发生什么?
- 在这种情况下,我是否需要担心在不同的分区中拥有相同的密钥?
解决方案
在回答您的问题时:
从二级存储(S3、HDFS)读取时,分区等于文件系统的块大小,128MB 或 256MB;但您可以立即重新分区 RDD,而不是数据帧。(对于 JDBC 和 Spark 结构化流,分区的大小是动态的。)
当应用“宽转换”并重新分区时,分区的数量和大小很可能会发生变化。给定分区的大小具有最大值。在 Spark 2.4.x 中,分区大小增加到 8GB。因此,如果任何转换(例如 collect_list 与 groupBy 组合)生成超过此最大大小,您将收到错误并且程序中止。因此,您需要明智地进行分区,或者在您的情况下有足够数量的分区进行聚合 - 请参阅 spark.sql.shuffle.partitions 参数。
Spark 处理的并行模型依赖于通过散列、范围分区等分配的“键”分配到一个且只有一个分区 - 洗牌。所以,遍历一个分区foreachPartition,mapPartitions没有问题。
推荐阅读
- reactjs - 如何从 useSelector 初始化 useState
- python - 应该如何调用使用 super() 检索的可调用对象?
- haskell - 使用一些“Coercible”用例避免“unsafeCoerce”
- flutter - Flutter,我无法在 TabView 中增加图标的大小
- javascript - 递归函数返回'未定义'
- maven - 用于测试与 repo 的连接的 Maven 标志?
- javascript - 向特定用户 socket.io 发送消息
- python - 在 Vercel 上托管 Quart 应用程序。部署时未找到 Quart
- android - 从 Fragment 返回时,Flow onEach/collect 被多次调用
- java - 具有反应式谓词的 Java 流过滤器