mongodb - Java-Spark-Mongo:filter(dataset.col(newTime).$greater(oldTime)) 未在完整数据集上运行
问题描述
我用 Mongo 连接器编写了一个 Java-Spark 代码。它应该从 MongoDB 中获取列createdDate
大于上一次运行的所有行createdDate
(比如我存储在 Oracle 中的每次运行的最大高水位线值。最初,Oracle 中的高水位线值是1900-01-01 00:00:00.000
) .
此列createdDate
是ISODate
mongoDB 中的类型。
在我的 MongoDB 数据中,为该列存储的最大值createdDate
是2018-04-11 01:43:20.165
.
但是filter
代码中的 没有按预期工作,即在第一次运行时它有时会提取直到2018-03-30 21:48:59.519
,然后在第二次或第三次运行它的提取直到最大值(2018-04-11 01:43:20.165
)。
理想情况下,当初始高水位线值为 时,它应该在第一次运行时发生1900-01....
。
这是代码:
package mongo;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.sql.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.bson.Document;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import java.sql.Timestamp;
public final class MongoRead
{
private static Connection con=null;
private static String readHighWaterMark(String table, String oraConn, String oraUser, String oraPswd) throws Exception
{
String highWaterMarkValue = "";
try
{
con=DriverManager.getConnection(oraConn,oraUser,oraPswd);
Statement stmt=con.createStatement();
ResultSet rs=stmt.executeQuery("select * from difa.HIGH_WATER_MARK_TABLE where table_nm='"+table+"'");
while(rs.next()){
highWaterMarkValue = rs.getString(3);
}
}
catch(Exception e){
e.printStackTrace();
con.close();
}
return highWaterMarkValue;
}
private static void setHighWaterMark(String key, String value) throws Exception
{
PreparedStatement pStmt=con.prepareStatement("UPDATE high_water_mark_table SET high_water_mark_VALUE='"+value+"' where table_nm='"+key+"'");
int i=pStmt.executeUpdate();
System.out.println(i+" records updated");
}
public static void main(final String[] args) throws Exception {
if(args.length<8){
System.out.println("Please provide correct inputs");
System.exit(1);
}
String mongoAddress = args[0];
String clusterAddress = args[1];
String oraConn = args[2];
String oraUser = args[3];
String oraPswd = args[4];
String tableNm = args[5];
String highWaterCol = args[6];
String loadType = args[7];
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkRecordReader")
.config("spark.mongodb.input.uri", mongoAddress)
.config("spark.mongodb.output.uri", mongoAddress)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
try{
FileSystem fs = FileSystem.get(new URI(clusterAddress),jsc.hadoopConfiguration());
fs.delete(new Path(clusterAddress),true);
}
catch(Exception e){
e.printStackTrace();
}
/* ********Read data from MongoDB******* */
Dataset<Row> dataset = MongoSpark.load(jsc).toDF();
if(loadType.equalsIgnoreCase("I")){
String highWaterMark = readHighWaterMark(tableNm,oraConn,oraUser,oraPswd);
System.out.println("============HIGH_WATER_MARK_VALUE: "+highWaterMark);
Timestamp oldTime = Timestamp.valueOf(highWaterMark.replace("T"," ").replace("Z", ""));
//Fetches records that where createdDate is greater than previous high Water Mark.
Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(oldTime)).persist();
filtered.toJSON().write().text(clusterAddress);
//Calculating the MAX(createdDate) in the fetched dataset.
Dataset<Row> maxHighWaterRow = filtered.agg(max(filtered.col(highWaterCol)).alias("newHighWater")).persist();
List<Timestamp> newHighWaterValue = maxHighWaterRow.select("newHighWater").as(Encoders.TIMESTAMP()).collectAsList();
Timestamp maxHighWaterMarkValue = newHighWaterValue.iterator().next();
SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Timestamp oldDate = Timestamp.valueOf(highWaterMark.replace('T', ' ').replace("Z",""));
//Setting HIGH_WATER_MARK_VALUE if a greater value is detected.
if(maxHighWaterMarkValue !=null && maxHighWaterMarkValue.after(oldDate)){
setHighWaterMark(tableNm,dtFormat.format(maxHighWaterMarkValue).replace(" ", "T").concat("Z"));
}
}
else{
dataset.toJSON().write().text(clusterAddress);
}
con.close();
jsc.close();
}
}
知道为什么filter
and$greater
没有正确获取记录吗?
解决方案
我通过添加.persist()
以下内容来解决此问题Dataset
:
/* ********Read data from MongoDB******* */
Dataset<Row> dataset = MongoSpark.load(jsc).toDF().persist();
....
..
...
Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(old)).persist();
我不知道为什么没有persist()
过滤器没有在整个数据集上运行。
推荐阅读
- c++ - cmake add_library 如何决定符号的可见性?
- java - 从jar文件中获取相对路径
- php - 如何在POS软件中直接打印到打印机而不显示窗口打印对话框?
- angular5 - 如何将名称值添加到输入元素
- c# - 在 WPF 中调用 ApplicationData.Current.LocalFolder.Path 会引发异常
- angular - 在 Angular 6 中只初始化一个变量一次,而不是在每次页面刷新之后
- python - 用于在数据框中搜索值并更新其旁边的值的 for 循环
- bash - 无法在 Mac 上安装 miniconda3
- jquery - 页面调用函数后的jquery .load()
- java - 在后台从 Android ViewModel 调用方法