apache-spark - Collect failed in ... s 由于 Stage 由于 SparkContext 已关闭而取消
问题描述
我想显示每个分区中元素的数量,所以我写了以下内容:
def count_in_a_partition(iterator):
yield sum(1 for _ in iterator)
如果我这样使用它
print("number of element in each partitions: {}".format(
my_rdd.mapPartitions(count_in_a_partition).collect()
))
我得到以下信息:
19/02/18 21:41:15 INFO DAGScheduler: Job 3 failed: collect at /project/6008168/tamouze/testSparkCedar.py:435, took 30.859710 s
19/02/18 21:41:15 INFO DAGScheduler: ResultStage 3 (collect at /project/6008168/tamouze/testSparkCedar.py:435) failed in 30.848 s due to Stage cancelled because SparkContext was shut down
19/02/18 21:41:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/02/18 21:41:16 INFO MemoryStore: MemoryStore cleared
19/02/18 21:41:16 INFO BlockManager: BlockManager stopped
19/02/18 21:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/02/18 21:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_14 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_14 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_3 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_3 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 INFO SparkContext: Successfully stopped SparkContext
....
注意到my_rdd.take(1)
返回:
[(u'id', u'text', array([-0.31921682, ...,0.890875]))]
我该如何解决这个问题?
解决方案
您必须为此使用glom()
功能。让我们举个例子。
让我们先创建一个 DataFrame。
rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions
分区数 -
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
Number of partitions: 5
每个分区上的行/元素数。这可以给你一个歪斜的想法 -
print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
Partitioning distribution: [3, 3, 2, 2, 2]
查看行在分区上的实际分布情况。请注意,如果数据集很大,那么您的系统可能会因为内存不足OOM
问题而崩溃。
print("Partitions structure: {}".format(df.rdd.glom().collect()))
Partitions structure: [
#Partition 1 [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)],
#Partition 2 [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)],
#Partition 3 [Row(key='c', value=4), Row(key='c', value=4)],
#Partition 4 [Row(key='e', value=0), Row(key='f', value=1)],
#Partition 5 [Row(key='d', value=2), Row(key='d', value=3)]
]
推荐阅读
- mysql - 从 Mac 上的 AMPSS 安装中恢复 mysql 数据库
- mysql - 开始学SQL,需要知道怎么做%
- handlebars.js - Mustache Handlebars 中的嵌套表达式(Triple-Stash 中的表达式)
- angular - 静默更新返回 OAuthErrorEvent {type: "silent_refresh_timeout", reason: null, params: null}
- azure - 带有 OrchtestrationTrigger 的 Azure 函数不适用于本地 Azure 存储模拟器
- html - CSS Flex 行在横轴上不增长/溢出
- php - 无法声明类 HunterObfuscator,因为该名称已在使用中
- excel - VBA - 使用指定的顺序进行决胜局
- amazon-elastic-beanstalk - 如何在 AWS Elastic Beanstalk 上为我的 Node.js 应用程序添加完整的 icu 支持?
- jquery - 使用 JS 或 JQ 将单选按钮转换为只读(不禁用)