首页 > 解决方案 > Java-Spark-Mongo:filter(dataset.col(newTime).$greater(oldTime)) 未在完整数据集上运行

问题描述

我用 Mongo 连接器编写了一个 Java-Spark 代码。它应该从 MongoDB 中获取列createdDate大于上一次运行的所有行createdDate(比如我存储在 Oracle 中的每次运行的最大高水位线值。最初,Oracle 中的高水位线值是1900-01-01 00:00:00.000) .
此列createdDateISODatemongoDB 中的类型。

在我的 MongoDB 数据中,为该列存储的最大值createdDate2018-04-11 01:43:20.165.
但是filter代码中的 没有按预期工作,即在第一次运行时它有时会提取直到2018-03-30 21:48:59.519,然后在第二次或第三次运行它的提取直到最大值(2018-04-11 01:43:20.165)。
理想情况下,当初始高水位线值为 时,它应该在第一次运行时发生1900-01....
这是代码:

package mongo;

import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.sql.Date;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.bson.Document;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import java.sql.Timestamp;

public final class MongoRead
{
    private static Connection con=null;
    private static String readHighWaterMark(String table, String oraConn, String oraUser, String oraPswd) throws Exception
    {
        String highWaterMarkValue = "";
        try
        {       
            con=DriverManager.getConnection(oraConn,oraUser,oraPswd);
            Statement stmt=con.createStatement();
            ResultSet rs=stmt.executeQuery("select * from difa.HIGH_WATER_MARK_TABLE where table_nm='"+table+"'");
            while(rs.next()){
                highWaterMarkValue = rs.getString(3);
            }
        }
        catch(Exception e){
            e.printStackTrace();
            con.close();
        }

        return highWaterMarkValue;
    }

    private static void setHighWaterMark(String key, String value) throws Exception
    {
        PreparedStatement pStmt=con.prepareStatement("UPDATE high_water_mark_table SET high_water_mark_VALUE='"+value+"' where table_nm='"+key+"'");
        int i=pStmt.executeUpdate();
        System.out.println(i+" records updated");

    }

    public static void main(final String[] args) throws Exception {
        if(args.length<8){
            System.out.println("Please provide correct inputs");
            System.exit(1);
        }
        String mongoAddress = args[0];
        String clusterAddress = args[1];
        String oraConn = args[2];
        String oraUser = args[3];
        String oraPswd = args[4];
        String tableNm = args[5];
        String highWaterCol = args[6];
        String loadType = args[7];


        SparkSession spark = SparkSession.builder()
            .master("local")
            .appName("MongoSparkRecordReader")
            .config("spark.mongodb.input.uri", mongoAddress)
            .config("spark.mongodb.output.uri", mongoAddress)
            .getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        try{
            FileSystem fs = FileSystem.get(new URI(clusterAddress),jsc.hadoopConfiguration());
            fs.delete(new Path(clusterAddress),true);
        }
        catch(Exception e){
            e.printStackTrace();
        }


        /* ********Read data from MongoDB******* */
        Dataset<Row> dataset = MongoSpark.load(jsc).toDF();

        if(loadType.equalsIgnoreCase("I")){
            String highWaterMark = readHighWaterMark(tableNm,oraConn,oraUser,oraPswd);
            System.out.println("============HIGH_WATER_MARK_VALUE: "+highWaterMark);

            Timestamp oldTime = Timestamp.valueOf(highWaterMark.replace("T"," ").replace("Z", ""));
            //Fetches records that where createdDate is greater than previous high Water Mark.
            Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(oldTime)).persist();
            filtered.toJSON().write().text(clusterAddress);

            //Calculating the MAX(createdDate) in the fetched dataset.
            Dataset<Row> maxHighWaterRow = filtered.agg(max(filtered.col(highWaterCol)).alias("newHighWater")).persist();           
            List<Timestamp> newHighWaterValue = maxHighWaterRow.select("newHighWater").as(Encoders.TIMESTAMP()).collectAsList();
            Timestamp maxHighWaterMarkValue = newHighWaterValue.iterator().next();              


            SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            Timestamp oldDate = Timestamp.valueOf(highWaterMark.replace('T', ' ').replace("Z",""));         
            //Setting HIGH_WATER_MARK_VALUE if a greater value is detected.
            if(maxHighWaterMarkValue !=null && maxHighWaterMarkValue.after(oldDate)){
                setHighWaterMark(tableNm,dtFormat.format(maxHighWaterMarkValue).replace(" ", "T").concat("Z"));
            }

        }
        else{
            dataset.toJSON().write().text(clusterAddress);
        }


        con.close();
        jsc.close();

    }
}

知道为什么filterand$greater没有正确获取记录吗?

标签: mongodbapache-sparkapache-spark-sql

解决方案


我通过添加.persist()以下内容来解决此问题Dataset

/* ********Read data from MongoDB******* */
Dataset<Row> dataset = MongoSpark.load(jsc).toDF().persist();
....
..
...
Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(old)).persist();

我不知道为什么没有persist()过滤器没有在整个数据集上运行。


推荐阅读