java - 优化这种从基础到基础的数据比较的想法?
问题描述
我写了一个小的 java (jdk1.8) 类来比较两个数据库之间的数据:一个请求从源数据库中获取行,然后多任务搜索目标数据库中的每一行(每个线程都有自己的连接和准备好的语句)。
下面的代码没问题(我只是删除了一些日志、密码和一些捕获异常),例如,我在 3 小时内比较了 28.700.000 行,对于一个有 140 列的表,从 DB2 到 Oracle 中的同一个表。我认为这并不令人难以置信,但正确;)。
所以如果你有一些想法来优化这段代码,我会接受的 :)
public class CompareDB {
public static AtomicInteger nbRowEqual = new AtomicInteger();
public static AtomicInteger nbRowNotFound = new AtomicInteger();
public static AtomicInteger nbRowDiff = new AtomicInteger();
public static AtomicInteger nbRowErr = new AtomicInteger();
public static List<PreparedStatement> targetPstmtPool = new ArrayList<PreparedStatement>();
public static final String[] columns = {"id", "col1", "col2", "col3", "col4"};
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
int MAX_THREAD = 200, position = 0;
Integer targetPstmtNumber;
List<Connection> targetConnectionPool = new ArrayList<Connection>();
long beginExecTime = System.currentTimeMillis(), startReqTime;
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
// Create connection + preparedStatement pools to target DB
System.out.println("Start opening connections");
for (int i=0; i<MAX_THREAD; i++) {
Connection targetConn = DriverManager.getConnection("jdbc:oracle:thin:@<host>:<port>:<dbname>", "<user>", "<password>");
targetConn.setAutoCommit(false);
targetConn.setReadOnly(true);
targetConnectionPool.add(targetConn);
PreparedStatement targetPstmt = targetConn.prepareStatement("select "+String.join(",", columns)+" from mytable where id=?");
targetPstmtPool.add(targetPstmt);
}
// Connect + select from source DB
Connection sourceConn = DriverManager.getConnection("jdbc:db2://<IP>:<port>/<dbname>", "<user>", "<password>");
sourceConn.setAutoCommit(false);
sourceConn.setReadOnly(true);
Statement sourceStmt = sourceConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet sourceResult = sourceStmt.executeQuery("select "+String.join(",", columns)+ " from owner.mytable");
System.out.println("Connections and statements opened in "+(System.currentTimeMillis()-beginExecTime)+" ms");
// Open pool of threads
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
CompletionService<Integer> completion = new ExecutorCompletionService<Integer>(executor);
System.out.println("------------------------------------------------------------");
System.out.println("Start compare data ("+MAX_THREAD+" parallel threads)");
System.out.println();
startReqTime = System.currentTimeMillis();
// For each row of the source request
while (sourceResult.next()) {
// Set which pstmt to give to thread
if (position < MAX_THREAD) targetPstmtNumber = new Integer(position);
else targetPstmtNumber = completion.take().get();
// extract values from resultSet as parameter for next thread
List<Object> sourceRow = new ArrayList<Object>();
for (int i=1; i<=columns.length; i++) {
sourceRow.add(sourceResult.getObject(i));
}
// Call thread
completion.submit(new CompareDbThread(sourceRow, targetPstmtNumber));
position++;
if (position % 10000 == 0) System.out.println(" -- "+position+" rows --");
else if (position % 100 == 0) System.out.print(".");
}
// Await for last threads
System.out.println();
System.out.println("Waiting last threads...");
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
// Close all Rs, Stmt, Conn
try {
System.out.println("------------------------------------------------------------");
System.out.println("Close all requests, statements, connections");
for (PreparedStatement targetPstmt : targetPstmtPool) targetPstmt.close();
for (Connection targetConn : targetConnectionPool) targetConn.close();
sourceResult.close();
sourceStmt.close();
sourceConn.close();
} catch (Exception e) {
System.out.println("[INFO] Error closing connections and requests : "+e.getMessage());
}
System.out.println("------------------------------------------------------------");
System.out.println("Data comparison done in "+(dateFormat.format(new Date(System.currentTimeMillis()-startReqTime)))
+" : "+nbRowEqual+" equals, "+nbRowNotFound+" not found, "+nbRowDiff+" diff, "+nbRowErr+" ERROR rows");
System.out.println("Threads : getCompletedTaskCount() = "+executor.getCompletedTaskCount()+", getTaskCount() = "+executor.getTaskCount());
System.out.println("Total time : "+(dateFormat.format(new Date(System.currentTimeMillis()-beginExecTime))));
}
}
public class CompareDbThread implements Callable<Integer> {
protected List<Object> sourceRow;
protected Integer targetPstmtNumber;
public CompareDbThread(List<Object> sourceRow, Integer targetPstmtNumber) {
this.sourceRow = sourceRow;
this.targetPstmtNumber = targetPstmtNumber;
}
public Integer call() {
ResultSet targetResult;
Object sourceColumnValue, targetColumnValue;
String sSourceColumnValue, sTargetColumnValue;
Double dSourceColumnValue, dTargetColumnValue;
boolean equalRow=true, equalColumn=true;
String message="", tempMessage="";
try {
PreparedStatement targetPstmt = CompareDB.targetPstmtPool.get(targetPstmtNumber.intValue());
targetPstmt.setObject(1, sourceRow.get(0));
targetResult = targetPstmt.executeQuery();
if (targetResult.next()) {
// Compare each column
for (int i=0; i<CompareDB.columns.length; i++) {
sourceColumnValue = sourceRow.get(i);
targetColumnValue = targetResult.getObject(i+1);
if ((sourceColumnValue!=null && targetColumnValue==null) || (sourceColumnValue==null && targetColumnValue!=null)) {
equalRow=false;
message += CompareDB.columns[i] + " : " + targetColumnValue + "(au lieu de " + sourceColumnValue + "), ";
}
else if (sourceColumnValue!=null && targetColumnValue!=null) {
// Compare as objects ...
if (!sourceColumnValue.equals(targetColumnValue)) {
sSourceColumnValue=sourceColumnValue.toString();
sTargetColumnValue=targetColumnValue.toString();
// if differents, compare as string ...
if (!sSourceColumnValue.equals(sTargetColumnValue)) {
tempMessage = CompareDB.columns[i] + " [String] : " + sTargetColumnValue + "(instead of " + sSourceColumnValue + "), ";
// if differents as string, compare as double
try {
dSourceColumnValue = new Double(sSourceColumnValue);
dTargetColumnValue = new Double(sTargetColumnValue);
if (!dSourceColumnValue.equals(dTargetColumnValue)) {
tempMessage = CompareDB.columns[i] + " [Number] : " + dTargetColumnValue + "(instead of " + dSourceColumnValue + "), ";
equalColumn=false;
}
}
catch (NumberFormatException e) {
equalColumn=false;
}
}
}
if (!equalColumn) {
message += tempMessage;
tempMessage = "";
equalColumn=true;
equalRow=false;
}
}
}
if (equalRow) {
CompareDB.nbRowEqual.incrementAndGet();
}
else {
CompareDB.nbRowDiff.incrementAndGet();
System.out.println(" [DIFFERENT] ["+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+"] => "+message);
}
}
else {
CompareDB.nbRowNotFound.incrementAndGet();
System.out.println(" [NOT FOUND] ["+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+"]");
}
}
catch (Exception e) {
CompareDB.nbRowErr.incrementAndGet();
System.err.println("[ERROR] "+CompareDB.columns[CompareDB.ID_COLUMN_POS]+"="+sourceRow.get(CompareDB.ID_COLUMN_POS)+" : "+e.getMessage());
}
return targetPstmtNumber;
}
}
解决方案
推荐阅读
- python - 有没有比覆盖我的父 Python 类的每个方法更好的方法?
- firebase - firebase auth => auth/captcha-check-failed 和“找不到主机名匹配”错误
- html - 如何更改导航元素的填充?
- ios - XCode 构建中 react-native-config 的 PhaseScriptExecution 失败
- python - 值错误“”使用序列设置数组元素”,组套索参数
- ssl - Apollo federation PUSH 命令跳过 SSLValidation
- ssl - SSLStrip + Mitmdump 不能一起工作
- kotlin-native - Kotlin/Native 编译器如何处理多态性?
- python - 如何在 Python 中正确登录不同的文件?
- c# - 何时使用 API、Nuget 和内部库