首页 > 解决方案 > 优化这种从基础到基础的数据比较的想法?

问题描述

我写了一个小的 java (jdk1.8) 类来比较两个数据库之间的数据:一个请求从源数据库中获取行,然后多任务搜索目标数据库中的每一行(每个线程都有自己的连接和准备好的语句)。

下面的代码没问题(我只是删除了一些日志、密码和一些捕获异常),例如,我在 3 小时内比较了 28.700.000 行,对于一个有 140 列的表,从 DB2 到 Oracle 中的同一个表。我认为这并不令人难以置信,但正确;)。

所以如果你有一些想法来优化这段代码,我会接受的 :)

public class CompareDB {

    public static AtomicInteger nbRowEqual = new AtomicInteger();
    public static AtomicInteger nbRowNotFound = new AtomicInteger();
    public static AtomicInteger nbRowDiff = new AtomicInteger();
    public static AtomicInteger nbRowErr = new AtomicInteger();
    public static List<PreparedStatement> targetPstmtPool = new ArrayList<PreparedStatement>();
    public static final String[] columns = {"id", "col1", "col2", "col3", "col4"};

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public static void main(String[] args) throws Exception {
        int MAX_THREAD = 200, position = 0;
        Integer targetPstmtNumber;
        List<Connection> targetConnectionPool = new ArrayList<Connection>();
        long beginExecTime = System.currentTimeMillis(), startReqTime;
        DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

        // Create connection + preparedStatement pools to target DB
        System.out.println("Start opening connections");
        for (int i=0; i<MAX_THREAD; i++) {
            Connection targetConn = DriverManager.getConnection("jdbc:oracle:thin:@<host>:<port>:<dbname>", "<user>", "<password>");
            targetConn.setAutoCommit(false);
            targetConn.setReadOnly(true);
            targetConnectionPool.add(targetConn);
            PreparedStatement targetPstmt = targetConn.prepareStatement("select "+String.join(",", columns)+" from mytable where id=?");
            targetPstmtPool.add(targetPstmt);
        }

        // Connect + select from source DB
        Connection sourceConn = DriverManager.getConnection("jdbc:db2://<IP>:<port>/<dbname>", "<user>", "<password>");
        sourceConn.setAutoCommit(false);
        sourceConn.setReadOnly(true);
        Statement sourceStmt = sourceConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
        ResultSet sourceResult = sourceStmt.executeQuery("select "+String.join(",", columns)+ " from owner.mytable");

        System.out.println("Connections and statements opened in "+(System.currentTimeMillis()-beginExecTime)+" ms");

        // Open pool of threads
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        CompletionService<Integer> completion = new ExecutorCompletionService<Integer>(executor);
        System.out.println("------------------------------------------------------------");
        System.out.println("Start compare data ("+MAX_THREAD+" parallel threads)");
        System.out.println();
        startReqTime = System.currentTimeMillis();

        // For each row of the source request
        while (sourceResult.next()) {
            // Set which pstmt to give to thread
            if (position < MAX_THREAD) targetPstmtNumber = new Integer(position);
            else                       targetPstmtNumber = completion.take().get();

            // extract values from resultSet as parameter for next thread
            List<Object> sourceRow = new ArrayList<Object>();
            for (int i=1; i<=columns.length; i++) {
                sourceRow.add(sourceResult.getObject(i));
            }

            // Call thread
            completion.submit(new CompareDbThread(sourceRow, targetPstmtNumber));

            position++;
            if (position % 10000 == 0) System.out.println("  -- "+position+" rows --");
            else if (position % 100 == 0) System.out.print(".");
        }

        // Await for last threads
        System.out.println();
        System.out.println("Waiting last threads...");
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();

        // Close all Rs, Stmt, Conn
        try {
            System.out.println("------------------------------------------------------------");
            System.out.println("Close all requests, statements, connections");
            for (PreparedStatement targetPstmt : targetPstmtPool) targetPstmt.close();
            for (Connection targetConn : targetConnectionPool) targetConn.close();
            sourceResult.close();
            sourceStmt.close();
            sourceConn.close();
        } catch (Exception e) {
            System.out.println("[INFO] Error closing connections and requests : "+e.getMessage());
        } 

        System.out.println("------------------------------------------------------------");
        System.out.println("Data comparison done in "+(dateFormat.format(new Date(System.currentTimeMillis()-startReqTime)))
                          +" : "+nbRowEqual+" equals, "+nbRowNotFound+" not found, "+nbRowDiff+" diff, "+nbRowErr+" ERROR rows");
        System.out.println("Threads : getCompletedTaskCount() = "+executor.getCompletedTaskCount()+", getTaskCount() = "+executor.getTaskCount());
        System.out.println("Total time : "+(dateFormat.format(new Date(System.currentTimeMillis()-beginExecTime))));
    }
}


public class CompareDbThread implements Callable<Integer> {
    protected List<Object> sourceRow;
    protected Integer targetPstmtNumber;

    public CompareDbThread(List<Object> sourceRow, Integer targetPstmtNumber) {
        this.sourceRow = sourceRow;
        this.targetPstmtNumber = targetPstmtNumber;
    }

    public Integer call() {
        ResultSet targetResult;
        Object sourceColumnValue, targetColumnValue;
        String sSourceColumnValue, sTargetColumnValue;
        Double dSourceColumnValue, dTargetColumnValue;
        boolean equalRow=true, equalColumn=true;
        String message="", tempMessage="";

        try {
            PreparedStatement targetPstmt = CompareDB.targetPstmtPool.get(targetPstmtNumber.intValue());
            targetPstmt.setObject(1, sourceRow.get(0));
            targetResult = targetPstmt.executeQuery();
            if (targetResult.next()) {
                // Compare each column
                for (int i=0; i<CompareDB.columns.length; i++) {
                    sourceColumnValue = sourceRow.get(i);
                    targetColumnValue = targetResult.getObject(i+1);

                    if ((sourceColumnValue!=null && targetColumnValue==null) || (sourceColumnValue==null && targetColumnValue!=null)) {
                        equalRow=false;
                        message += CompareDB.columns[i] + " : " + targetColumnValue + "(au lieu de " + sourceColumnValue + "), "; 
                    }
                    else if (sourceColumnValue!=null && targetColumnValue!=null) {
                        // Compare as objects ...
                        if (!sourceColumnValue.equals(targetColumnValue)) {
                            sSourceColumnValue=sourceColumnValue.toString();
                            sTargetColumnValue=targetColumnValue.toString();

                            // if differents, compare as string ...
                            if (!sSourceColumnValue.equals(sTargetColumnValue)) {
                                tempMessage = CompareDB.columns[i] + " [String] : " + sTargetColumnValue + "(instead of " + sSourceColumnValue + "), ";
                                // if differents as string, compare as double
                                try {
                                    dSourceColumnValue = new Double(sSourceColumnValue);
                                    dTargetColumnValue = new Double(sTargetColumnValue);
                                    if (!dSourceColumnValue.equals(dTargetColumnValue)) {
                                        tempMessage = CompareDB.columns[i] + " [Number] : " + dTargetColumnValue + "(instead of " + dSourceColumnValue + "), ";

                                        equalColumn=false;
                                    }
                                }
                                catch (NumberFormatException e) {
                                    equalColumn=false;
                                }
                            }
                        }
                        if (!equalColumn) {
                            message += tempMessage;
                            tempMessage = "";
                            equalColumn=true;
                            equalRow=false;
                        }
                    }
                }
                if (equalRow) {
                    CompareDB.nbRowEqual.incrementAndGet();
                }
                else {
                    CompareDB.nbRowDiff.incrementAndGet();
                    System.out.println("  [DIFFERENT] ["+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+"] => "+message);
                }
            }
            else {
                CompareDB.nbRowNotFound.incrementAndGet();
                System.out.println("  [NOT FOUND] ["+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+"]");
            }
        }
        catch (Exception e) {
            CompareDB.nbRowErr.incrementAndGet();
            System.err.println("[ERROR] "+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+" : "+e.getMessage());
        }

        return targetPstmtNumber;
    }
}

标签: javadatabaseperformanceoptimizationcompare

解决方案


推荐阅读