apache-spark - 无法使用 Spark-Mongo 驱动程序从多个数据库加载
问题描述
我正在研究 spark 模块,我需要从多个源(数据库)加载集合,但我无法从第二个数据库获取集合。
数据库
DB1
L_coll1
DB2
L_coll2
逻辑代码
String mst ="local[*]";
String host= "localhost";
String port = "27017";
String DB1 = "DB1";
String DB2 = "DB2";
SparkConf conf = new SparkConf().setAppName("cust data").setMaster(mst);
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.mongodb.input.uri", "mongodb://"+host+":"+port+"/")
.config("spark.mongodb.input.database",DB1)
.config("spark.mongodb.input.collection","coll1")
.getOrCreate();
SparkSession spark1 = SparkSession
.builder()
.config(conf)
.config("spark.mongodb.input.uri", "mongodb://"+host+":"+port+"/")
.config("spark.mongodb.input.database",DB2)
.config("spark.mongodb.input.collection","coll2")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaSparkContext jsc1 = new JavaSparkContext(spark1.sparkContext());
读取配置
ReadConfig readConfig = ReadConfig.create(spark);
Dataset<Row> MongoDatset = MongoSpark.load(jsc,readConfig).toDF();
MongoDatset.show();
ReadConfig readConfig1 = ReadConfig.create(spark1);
Dataset<Row> MongoDatset1 = MongoSpark.load(jsc1,readConfig1).toDF();
MongoDatset1.show();
运行 about 代码后,我多次获得第一个数据集。如果我评论第一个SparkSession spark
实例而不是仅从第二个 db 获取集合DB2
。
解决方案
ReadConfig
您可以使用's override 选项来获取多个数据库和集合,而不是使用多个 spark 会话。
创建火花会话
String DB = "DB1";
String DB1 = "DB2";
String Coll1 ="Coll1";
String Coll2 ="Coll2";
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
获取数据库功能
private static Dataset<Row> getDB(JavaSparkContext jsc_, String DB, String Coll1) {
// Create a custom ReadConfig
Map<String, String> readOverrides = new HashMap<String, String>();
readOverrides.put("database",DB );
readOverrides.put("collection", Coll1);
readOverrides.put("readPreference.name", "secondaryPreferred");
System.out.println(readOverrides);
ReadConfig readConfig = ReadConfig.create(jsc_).withOptions(readOverrides);
return MongoSpark.load(jsc_,readConfig).toDF();
}
使用 getDB 创建多个数据库
Dataset<Row> MongoDatset1 = getDB(jsc, DB, Coll1);
Dataset<Row> MongoDatset2 = getDB(jsc, DB1, Coll2);
MongoDatset1.show(1);
MongoDatset2.show(1);
推荐阅读
- c# - 滚动问题 - 当我向上移动滚动条时不回滚
- javascript - 使用 typescript/javascript 解构对象
- powershell - Connect-MsolService -Credential 不再起作用了吗?
- sql - 与错误字段上的 referencedColumnName 一对一映射
- javascript - 使用 switch 语句更改 HTML 元素的文本内容的问题
- reactjs - 如何自定义 react-jinke-music-player 的图标
- mongodb - Scala Spark Mongo - 使用“in”子句过滤
- firebase - 防止 DDoS 和账单增加 - Firestore 公共数据
- java - 如何用新值更新 JSONArray?
- mysql - 带有 MAX() 的 GROUP BY 返回错误的行 ID