首页 > 解决方案 > 无法使用 java 中的 spark-redshift 库连接到 S3

问题描述

我正在尝试基于 spark 数据集在 Redshift 中创建一个表。我在 jdbc 中使用 spark-redshift 驱动程序在本地实现这一点。执行此操作的代码片段

data.write()
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://..")
.option("dbtable", "test_table")
.option("tempdir", "s3://temp")
.option("aws_iam_role", "arn:aws:iam::..")
.option("extracopyoptions", "region 'us-west-1'")
.mode(SaveMode.Append).save();

我的 maven pom.xml 具有以下依赖项:

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-redshift_2.11</artifactId>
   <version>2.0.1</version>
</dependency>

我正在使用java 1.8。我收到以下错误:

java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at com.databricks.spark.redshift.Utils$.assertThatFileSystemIsNotS3BlockFileSystem(Utils.scala:156)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:340)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at com.peak.spark.jobs.SparkDataIngestJob.writeData(SparkDataIngestJob.java:196)
    at com.peak.spark.jobs.SparkDataIngestJob.exec(SparkDataIngestJob.java:123)
    at com.peak.spark.core.AbstractSparkJob.run(AbstractSparkJob.java:74)
    at com.peak.spark.core.SparkAppLauncher.onApplicationEvent(SparkAppLauncher.java:40)
    at com.peak.spark.core.SparkAppLauncher.onApplicationEvent(SparkAppLauncher.java:16)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:151)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:128)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:331)
    at org.springframework.context.support.AbstractApplicationContext.start(AbstractApplicationContext.java:1174)
    at com.peak.spark.core.SparkApp.launch(SparkApp.java:38)
    at com.peak.spark.core.SparkApp.main(SparkApp.java:55)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.RuntimeException: Spark Job FailedNo FileSystem for scheme: s3
    at com.peak.spark.jobs.SparkDataIngestJob.exec(SparkDataIngestJob.java:162)
    at com.peak.spark.core.AbstractSparkJob.run(AbstractSparkJob.java:74)
    at com.peak.spark.core.SparkAppLauncher.onApplicationEvent(SparkAppLauncher.java:40)
    at com.peak.spark.core.SparkAppLauncher.onApplicationEvent(SparkAppLauncher.java:16)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:151)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:128)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:331)
    at org.springframework.context.support.AbstractApplicationContext.start(AbstractApplicationContext.java:1174)
    at com.peak.spark.core.SparkApp.launch(SparkApp.java:38)
    at com.peak.spark.core.SparkApp.main(SparkApp.java:55)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


请帮我弄清楚这里出了什么问题。

标签: javaapache-sparkspark-redshift

解决方案


由于您试图在本地系统上执行此代码,因此您的代码将不知道如何访问 s3 文件系统。

您可以执行以下两项操作之一来解决此问题:

  1. 在您的系统中配置 AWS 凭证,以便您的代码以某种方式尝试访问 s3 存储桶。由于各种原因,我不会推荐这种方法。
  2. 将文件路径保存在配置文件中。使用 2 个配置文件 - 一个用于测试代码,另一个用于生产环境。在测试环境中,使用 c:\path\to\your\dummy\folder\ 等路径,在生产环境配置文件中使用 s3:\your_bucket_name\path\in\bucket 等路径。

希望能帮助到你。


推荐阅读