apache-spark - 使用 HDFS 存储的 Spark 作业
问题描述
我在 Google Cloud Dataproc 上运行了一个长期运行的 Spark Structured Streaming Job,它使用 Kafka 作为源和接收器。我还将我的检查点保存在 Google Cloud Storage 中。
运行一周后,我注意到它正在稳步消耗所有 100 GB 磁盘存储空间,将文件保存到/hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...
.
我的理解是我的 Spark 作业不应该对本地磁盘存储有任何依赖。
我在这里完全误解了吗?
我像这样提交了我的工作:
(cd app/src/packages/ && zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
--cluster cluster-26aa \
--region us-central1 \
--properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
--py-files build/mypkg.zip \
--max-failures-per-hour 10 \
--verbosity info \
app/src/explode_rmq.py
这些是我工作的相关部分:
资源:
spark = SparkSession \
.builder \
.appName("MyApp") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile('mypkg.zip')
df = spark \
.readStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("subscribe", "lsport-rmq-12") \
.option("startingOffsets", "earliest") \
.load() \
.select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))
下沉:
sink_kafka_q = sink_df \
.writeStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("topic", "my_topic") \
.option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
.start()
解决方案
如果内存不够,Spark 会在本地磁盘上持久化信息。您可以像这样禁用磁盘上的持久性:
df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
或者你可以尝试像这样将信息序列化以占用更少的内存
df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
读取序列化数据将占用更多 CPU。
每个数据帧都有其独特的序列化级别。
欲了解更多信息:https ://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
推荐阅读
- java - 尝试从输入字段中读取日期值时获取未定义的值
- batch-file - 简单的 if 语句批处理
- java - 如何配置 Spring WS 以生成 SOAP 1.2
- visual-studio - 带有 Visual Studio 2017 的 3DS Max 插件向导
- algorithm - Dynamic Programming Solution for a Variant of Coin Exchange
- uml - How to express the relationship between java.time.LocalDate, DateTimeFormatter, String with UML
- git - An unexpected version directory Classes was encountered
- swift - Launching ViewController as sheet from windowDidLoad
- sql - Get the name of an ID for two separate columns
- java - SparkJava 可变参数个数