python-3.x - 在 Kubernetes 上运行 Spark 作业时,如何避免 Pod 的 DiskPressure 条件及其最终驱逐?
问题描述
我想重新分区数据集,然后将其写入目标路径。但是,由于DiskPressure
. Spark 只显示它失去了一个工人,但是当我events
在我的 OpenShift 控制台中看到 时,我看到 pod(worker) 被驱逐了。
这是我重新分区的方式:
df = df.repartition("created_year", "created_month", "created_day")
df.write.partitionBy("created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
大约有 38k 个分区:
Job Id ▾
Description
Submitted
Duration
Stages: Succeeded/Total Tasks (for all stages): Succeeded/Total
1
parquet at NativeMethodAccessorImpl.java:0
(kill)parquet at NativeMethodAccessorImpl.java:0 2020/08/11 21:35:46 1.5 h 0/2
2166/38281 (5633 failed)
Spark配置如下:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark cluster is: {}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '5G') \
.set('spark.driver.memory', '3G') \
.set('spark.sql.parquet.enableVectorizedReader', True) \
.set('spark.sql.files.ignoreCorruptFiles', True)
)
return sc_conf
我无法弄清楚导致 DiskPressure 的原因,我该如何阻止它?
我阅读了一些关于 DiskPressure 及其处理的答案和文章,但它们更通用,与 Spark 无关。
Spark 有 6 个工作人员,每个工作人员有 5GB 内存和 6 个内核。
解决方案
DiskPressure 是容器的磁盘使用量大幅增加的情况,因此运行 pod 的节点面临磁盘可用性的紧缩。这种紧缩可能会小于总可用性的 5-10%。
在这种情况下,kubelet 会在节点上设置 DiskPressure 状态(该节点还没有准备好调度),因此不会调度较新的 pod 并且会驱逐 pod(重新调度到其他可用性)以满足 pod 的正常运行时间。
面临磁盘压力的最常见情况是缺少日志轮换(调试日志),其他情况是在磁盘有限的节点上写入大量数据。
编辑:我的回答是通用的,并不特定于火花场景。
推荐阅读
- javascript - 将 highcharts 图例停靠在图表的最底部
- maven - 如何用maven修复外部依赖缺失的jar?
- laravel - 我在 laravel 中的路线有问题
- java - JTable 不断将数字排序为字符串
- xamarin - Xamarin Forms:将 DAL 方法调用到不同的页面
- android - flutter - 直接在材质应用程序中访问 MediaQuery.of(context).size
- c# - 有没有办法为子类属性设置一个值并且它出现在不同的基类属性中?
- ms-access - 从 MS Access 中的另一个表单获取文本框值
- elasticsearch - ELASTICSEARCH - 多个字段上的 Filter_path
- gulp - gulp - 默认加载子目录(example.com/en/)