apache-spark - 使用 Spark 从 S3 复制到 HDFS
问题描述
我正在将ConnectionTimeOutException
文件从 S3 写入 HDFS。
尝试添加超时参数:
import java.io.IOException
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConversions._
val sparkSession:SparkSession=SparkSession.builder().master("yarn").appName("To hdfs").getOrCreate
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", "XXXXXXXXXXXXXXXXXxx")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.attempts.maximum", "30")
sparkSession.sparkContext.hadoopConfiguration.set("spark.speculation", "false")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.host","webproxy.e.corp.services")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.port","80")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled","true")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","s3.us-east-1.amazonaws.com")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", "XXXXXXXXXXXXXXXXXx")
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sparkSession.sparkContext.hadoopConfiguration.set("spark.yarn.queue","root.ecpdevingest")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.connection.establish.timeout","10000")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.connection.timeout","50000")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.threads.max","100")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.threads.core","5")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.multipart.size","104857600")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.multipart.threshold","2147483647")
sparkSession.sparkContext.hadoopConfiguration.set("spark.executor.instances", "8")
sparkSession.sparkContext.hadoopConfiguration.set("spark.executor.cores", "4")
sparkSession.sparkContext.hadoopConfiguration.set("spark.executor.memory", "32g")
sparkSession.sparkContext.hadoopConfiguration.set("spark.driver.memory", "4g")
sparkSession.sparkContext.hadoopConfiguration.set("spark.driver.cores", "2")
val file = sparkSession.sparkContext.textFile("s3a://acrXXXXXXXXXXXXXXXXX5.avro", 11)
file.saveAsObjectFile("hdfs://c411apy.int.westgroup.com:8020/project/ecpdevingest/avro/100")
错误是:
org.apache.http.conn.ConnectTimeoutException: Connect to acron-avro-bucket.s3.amazonaws.com:443 timed out
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:416)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:180)
at org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:151)
at org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:125)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:643)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:479)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStora
解决方案
- 没有人需要设置 fs.s3a.impl 选项。曾经。它在 spark 文档中流传下来的一些迷信,每个人都在不理解的情况下重复。
- 如果这样做,请不要将 fs.s3a.impl 设置为 S3N 实现类。
org.apache.hadoop.fs.s3native.NativeS3FileSystem
. 您已完成此操作,因此不会读取和使用任何一个 fs.s3a 选项。
删掉那个声明,然后再试一次。
推荐阅读
- node.js - nodejs express post请求中的请求正文为空
- dart - 为什么 "".leftPad ( 0 ) 产生与 "".leftPad ( 1 ) 相同的结果?
- c# - 将包添加到 .NET Core 应用程序时,Visual Studio 2017 不会自动安装依赖项
- xml - 在 VB.net 中使用 XMLReader 读取大型 XML 文件
- tfs - TFS 报告服务器:显示工作项是否从区域路径退回到区域路径的报告
- php - 如何使用 node.js 向使用 https-post 的 php 页面进行 POST 请求?
- azure - Azure DevOps 需要哪些权限才能发布 NPM 包
- pandas - 使用 Pyarrow 读取分区 Parquet 文件会占用太多内存
- excel - 有没有更简单/更好的方法来使用预定义的字典?
- typescript - 在方法中推断正确的字符串文字