apache-spark - Apache Spark 写入需要数小时
问题描述
我有一个基于两个表进行正确连接的 spark 作业,读取和连接速度非常快,但是当尝试将连接结果插入到 cassandra db 时,速度太慢了。插入1000行需要30多分钟,插入9行记录需要3分钟。请看我下面的配置。我们有 3 个 cassandra 和 spark 节点,并且为所有节点安装了 spark。我对 Spark 很陌生,无法理解出了什么问题。我可以使用 dse 驱动程序在不到 1 秒的时间内插入相同大小的数据(超过 2000 行)。感谢您的时间和帮助!
火花提交:
"dse -u " + username + " -p " + password + " spark-submit --class com.SparkJoin --executor-memory=20G " +
"SparkJoinJob-1.0-SNAPSHOT.jar " + filterMap.toString() + "
Spark核心版本:2.7.2
火花卡桑德拉连接器_2.11:2.3.1
火花-sql_2.11:2.3.1
火花会议
SparkConf conf = new SparkConf(true).setAppName("Appname");
conf.set("spark.cassandra.connection.host", host);
conf.set("spark.cassandra.auth.username", username);
conf.set("spark.cassandra.auth.password", password);
conf.set("spark.network.timeout", "600s");
conf.set("spark.cassandra.connection.keep_alive_ms", "25000");
conf.set("spark.cassandra.connection.timeout_ms", "5000000");
conf.set("spark.sql.broadcastTimeout", "5000000");
SparkContext sc = new SparkContext(conf);
SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate();
SQLContext sqlContext = sparkSession.sqlContext();
sqlContext.setConf("spark.cassandra.connection.host", host);
sqlContext.setConf("spark.cassandra.auth.username", username);
sqlContext.setConf("spark.cassandra.auth.password", password);
sqlContext.setConf("spark.network.timeout", "600s");
sqlContext.setConf("spark.cassandra.connection.keep_alive_ms", "2500000");
sqlContext.setConf("spark.cassandra.connection.timeout_ms", "5000000");
sqlContext.setConf("spark.sql.broadcastTimeout", "5000000");
sqlContext.setConf("spark.executor.heartbeatInterval", "5000000");
sqlContext.setConf("spark.sql.crossJoin.enabled", "true");
左右表获取;
Dataset<Row> resultsFrame = sqlContext.sql("select * from table where conditions");
return resultsFrame.map((MapFunction<Row, JavaObject>) row -> {
// some operations here
return obj;
}, Encoders.bean(JavaObject.class)
);
加入
Dataset<Row> result = RigtTableJavaRDD.join(LeftTableJavaRDD,
(LeftTableJavaRDD.col("col1").minus(RigtTableJavaRDD.col("col2"))).
between(new BigDecimal("0").subtract(twoHundredMilliseconds), new BigDecimal("0").add(twoHundredMilliseconds))
.and(LeftTableJavaRDD.col("col5").equalTo(RigtTableJavaRDD.col("col6")))
, "right");
插入结果
CassandraJavaUtil.javaFunctions(resultRDD.javaRDD()).
writerBuilder("keyspace", "table", CassandraJavaUtil.mapToRow(JavaObject.class)).
saveToCassandra();
解决方案
推荐阅读
- java - 在 debian 打包中强制使用 java 版本
- node.js - 如何从我的包用户的导入语句中删除输出文件夹?
- python-3.x - 如何通过python子进程执行具有两个输出重定向的diff命令?
- php - 我如何将购物车项目价格存储为 woocommerce 订单中的新行元项目
- c++ - 不同内存位置中的 C++ 默认数组值
- awk - 从制表符分隔的命令输出中拆分管道分隔列的元素
- javascript - 为 IE10 转换 Javascript
- sql-server - SQL Server TEMP_DB_CONTENTION
- python - pybind11 在构造函数调用时崩溃并参考
- apple-push-notifications - 如何将密钥库证书绑定到 apns 通知中心