apache-spark - 将 spark Java 的最终数据集输出写入 s3
问题描述
我无法找到从数据集 spark 将数据写入 s3 的正确方法。我应该添加更多配置。我是否必须在我的代码中提及 AWS 配置,否则它会从本地 .aws/ 配置文件中获取它?
请指导
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class sparkSqlMysql {
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);
private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
.getOrCreate();
public static void main(String[] args) {
// JDBC connection properties
final Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "password");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
final String dbTable = "(select * from Fielding) t";
final String dbTable1 = "(select * from Salaries) m";
final String dbTable2 = "(select * from Pitching) n";
// Load MySQL query result as Dataset
Dataset<Row> jdbcDF2 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable,
connectionProperties);
Dataset<Row> jdbcDF3 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable1,
connectionProperties);
Dataset<Row> jdbcDF4 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable2,
connectionProperties);
jdbcDF2.createOrReplaceTempView("Fielding");
jdbcDF3.createOrReplaceTempView("Salaries");
jdbcDF4.createOrReplaceTempView("Pitching");
Dataset<Row> sqlDF = sparkSession.sql(
"select Salaries.yearID, avg(Salaries.salary) as Fielding from Salaries inner join Fielding ON Salaries.yearID = Fielding.yearID AND Salaries.playerID = Fielding.playerID group by Salaries.yearID limit 5");
Dataset<Row> sqlDF1 = sparkSession.sql(
"select Salaries.yearID, avg(Salaries.salary) as Pitching from Salaries inner join Pitching ON Salaries.yearID = Pitching.yearID AND Salaries.playerID = Pitching.playerID group by Salaries.yearID limit 5");
// sqlDF.show();
// sqlDF1.show();
sqlDF.createOrReplaceTempView("avg_fielding");
sqlDF1.createOrReplaceTempView("avg_pitching");
Dataset<Row> final_query_1_output = sparkSession.sql(
"select avg_fielding.yearID, avg_fielding.Fielding, avg_pitching.Pitching from avg_fielding inner join avg_pitching ON avg_pitching.yearID = avg_fielding.yearID");
final_query_1_output.show();
查询的输出是:
final_query_1_output.show();
+------+------------------+------------------+
|yearID| Fielding| Pitching|
+------+------------------+------------------+
| 1990| 507978.625320787| 485947.2487437186|
| 2003|2216200.9609838845|2133800.1867612293|
| 2007|2633213.0126475547|2617533.3393665156|
| 2015|3996199.5729421354| 3955581.121535181|
| 2006| 2565803.492487479| 2534756.866972477|
+------+------------------+------------------+
我想将此数据集写入 s3:我该怎么做?
final_query_1_output.write().mode("overwrite").save("s3n://druids3migration/data.csv");
解决方案
推荐阅读
- c - 如何将二维数组地址传递给函数并在其上写入并在调用函数中打印
- java - 在java中,如果我扩展一个类并使其成为主函数中的对象,是否会在堆中创建父类的任何对象?
- html -
标签未根据 Bootstrap 版本显示 - python - Psycopg2 关系数据库不存在
- android - 如何解决“无法确定任务':app:compileReleaseKotlin'的依赖关系。” 飘飘然
- ssis - SSIS执行进程任务在运行.bat文件时表现异常
- javascript - Jupyter Notebook 拒绝来自 localhost 的跨域请求
- c++ - 下面的语句是什么意思?
- laravel - 在 Laravel 8 中单独创建管理员表
- javascript - 如何从资产中的javascript文件调用角度打字稿文件以传递变量