首页 > 解决方案 > 将数据帧写入 Ceph 存储时出错

问题描述

在我的组织中,我目前正在探索如何使用 Ceph 替换 HDFS 来运行 AI/ML 工作负载。作为该计划的一部分,我们设置了一个 Ceph 集群并使用 Rook 将其导入 Kubernetes。

在我使用 Ceph 进行测试期间,我能够使用 S3CMD CLI 访问 Ceph 存储,还能够使用 Kubernetes 上的 Spark 从 Ceph 读取数据。但是,在将数据写回 Ceph 存储时出现错误。

下面是我在写回数据时遇到的代码和错误。希望有人可以帮助解决这个问题。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("prateek-pyspark-ceph") \
        .config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
        .config("spark.kubernetes.namespace", "jupyter") \
        .config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
        .config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
        .config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
        .config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
        .config("spark.hadoop.fs.s3a.access.key", "xxxx") \
        .config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
        .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
        .config("spark.hadoop.fs.s3a.fast.upload","true") \
        .config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
        .config("spark.executor.instances", "1") \
        .config("spark.executor.cores", "3") \
        .config("spark.executor.memory", "55g") \
        .config("spark.eventLog.enabled", "false") \
        .getOrCreate()

# Read Source Datasets 

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

# Register dataframes as temp tables 

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

# Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id, 
        count(distinct musical_data.reviewerID) as unique_reviewer_id_count, 
        musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

# Display top 10 products

top_rated.show(truncate=False)

# Save output as csv

top_rated.write.format("csv") \
        .option("header","true") \
        .mode("overwrite") \
        .save("s3a://bucket/output-data/")

# Stop Spark Context to release resources 

spark.stop()

写入数据帧时出错。

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
    at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
    at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
    at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
    at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 44 more

版本 -

火花 3.0.1 Hadoop 3.2

标签: apache-sparkamazon-s3kubernetesceph

解决方案


最后,这项工作在客户端和集群模式下都有效

通过 Spark 提交的集群模式

./spark-submit \
--master k8s://https://xxxx:6443 \
--deploy-mode cluster \
--name prateek-ceph-pyspark \
--conf spark.kubernetes.namespace=jupyter \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=55g \
--conf spark.kubernetes.container.image=spark-executor-3.0.1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image.pullSecrets=gcr \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.endpoint=http://xxxx:8080 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.eventLog.enabled=false \
s3a://ceph-bucket/scripts/Ceph_PySpark.py

Jupyter Notebook 中的客户端模式 -

Spark Config with AWS Credential Provider - org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("prateek-pyspark-ceph") \
        .config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443") \
        .config("spark.kubernetes.namespace", "jupyter") \
        .config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
        .config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
        .config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
        .config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
        .config("spark.hadoop.fs.s3a.access.key", "xxxxx") \
        .config("spark.hadoop.fs.s3a.secret.key", "xxxxx") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", "{​​​​​​​}​​​​​​​:{​​​​​​​}​​​​​​​&quot;.format("http://xxxx", "8080")) \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.executor.instances", "2") \
        .config("spark.executor.cores", "6") \
        .config("spark.executor.memory", "55g") \
        .getOrCreate()

Spark Config with AWS Credential Provider - com.amazonaws.auth.EnvironmentVariableCredentialsProvider

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("prateek-pyspark-ceph") \
        .config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443") \
        .config("spark.kubernetes.namespace", "jupyter") \
        .config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
        .config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
        .config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
        .config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", "{​​​​​​​}​​​​​​​:{​​​​​​​}​​​​​​​&quot;.format("http://xxxx", "8080")) \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.executor.instances", "2") \
        .config("spark.executor.cores", "6") \
        .config("spark.executor.memory", "55g") \
        .getOrCreate()

要解决在客户端模式下写入 Ceph 中的问题,请在文件中添加与fs.s3a.committer.staging.*此处提到的所有相关的属性 - https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.htmlcore-site.xml在 Driver 和 Executor 图像中


推荐阅读