jdbc操作 資料庫同步,全量,加入執行緒,批處理
阿新 • • 發佈:2019-01-29
動態資料庫的全量、增量同步,多執行緒增加效能,批處理。
程式碼可以直接執行:
同步的類:
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class SynchronizationController{
//同步源
static String url_source="jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8" ;
//目標庫
static String url_destination="jdbc:mysql://localhost:3306/xx01?user=root&password=123456&useUnicode=true&characterEncoding=UTF8";
static Connection conn_source = null;
static Connection conn_destination = null;
static String sql_read;
static String sql_insert;
static final int batchSize = 15000;
static final int max_thread_size=5;
public static void init(){
}
public static void writeData(){
}
public static void main(String[] args) throws SQLException, InterruptedException {
try {
Class.forName("com.mysql.jdbc.Driver");
conn_source = DriverManager.getConnection(url_source);
conn_destination= DriverManager.getConnection(url_destination);
conn_destination.setAutoCommit(false);
synchronizationTables(conn_source, conn_destination);
addData(conn_source, conn_destination);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
while(true){
if(InsertThread.getThreadCounts()>0){
Thread.sleep(1000);
}else{
break;
}
}
conn_source.close();
conn_destination.close();
}
}
//本地獲取表名獲取表名
public static Set<String> getTableName(Connection con) {
Set<String> set = new HashSet<String>();
try {
DatabaseMetaData meta = con.getMetaData();
ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" });
while (rs.next()) {
set.add(rs.getString("TABLE_NAME"));
// String s = rs.getString("TABLE_NAME");
// String type = rs.getString("TABLE_TYPE");
// System.out.println(s+"======"+type);
// getTableDDL(rs.getString("TABLE_NAME"), con);
}
} catch (Exception e) {
e.printStackTrace();
}
return set;
}
//目標資料庫
public static Map<String,String> getTableNameToMap(Connection con) {
Map<String,String> map=new HashMap<String,String>();
try {
DatabaseMetaData meta = con.getMetaData();
ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" });
while (rs.next()) {
map.put(rs.getString("TABLE_NAME"),"1");
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}
//建立表
public static void createTable(String sql_ddl) throws SQLException {
Statement stmt = conn_destination.createStatement();
int result = stmt.executeUpdate(sql_ddl);// executeUpdate語句會返回一個受影響的行數,如果返回-1就沒有成功
if (result != -1) {
System.out.println("表建立成功");
}else{
System.out.println("表建立失敗:"+sql_ddl);
}
}
//建立sql
public static String getTableField(String tableName,Connection con) throws SQLException{
String sql = "select * from "+tableName;
Statement state = con.createStatement();
ResultSet rs = state.executeQuery(sql);
ResultSetMetaData rsd = rs.getMetaData() ;
StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" (");
StringBuffer sql_param=new StringBuffer(" VALUES(");
for(int i = 1; i <= rsd.getColumnCount(); i++) {
sql_model.append(rsd.getColumnName(i));
sql_param.append("?");
if (i < rsd.getColumnCount()) {
sql_model.append(",");
sql_param.append(",");
}
}
sql_model.append(") ");sql_param.append(") ");
System.out.println(sql_model.toString()+sql_param.toString());
return sql_model.toString()+sql_param.toString();
}
public static void getTableField2(String tableName,Connection conn) throws SQLException{
ResultSet rs = conn.getMetaData().getColumns(null, conn.getMetaData().getUserName(),tableName.toUpperCase(), "%");
while(rs.next()){
String colName = rs.getString("COLUMN_NAME");
String remarks = rs.getString("REMARKS");
String dbType = rs.getString("TYPE_NAME");
System.out.println(colName+","+remarks+","+dbType);
}
}
//獲取表結構ddl
public static String getTableDDL(String tableName,Connection conn) throws SQLException{
ResultSet rs = null;
PreparedStatement ps = null;
ps = conn.prepareStatement("show create table "+tableName);
rs = ps.executeQuery();
StringBuffer ddl=new StringBuffer();
while (rs.next()) {
ddl.append(rs.getString(rs.getMetaData().getColumnName(2)));
}
return ddl.toString();
}
/**
* 檢查本地庫所有表在B庫裡是否存在,是否一致
* A本地庫 B目標庫
*/
public static void synchronizationTables(Connection conA,Connection conB) throws SQLException{
Set<String> a_set=getTableName(conA);
Map<String,String> b_map=getTableNameToMap(conB);
Iterator<String> it=a_set.iterator();
while(it.hasNext()){
String n=it.next();
if(b_map.get(n)==null){
System.out.println("表名:"+n+" 不在目標庫裡");
String create_table_ddl=getTableDDL(n, conA);
createTable(create_table_ddl);
}
}
}
//清楚表資料
public static boolean clearTableData(String tableName,Connection con){
try {
Statement stmt = con.createStatement();
String sql = "TRUNCATE TABLE "+tableName;
stmt.executeUpdate(sql);
System.out.println(tableName+":表資料已被清空");
} catch (SQLException e) {
e.printStackTrace();
System.out.println("異常表:"+tableName+"----資料清空失敗");
return false;
}
return true;
}
public static void addData(Connection conA,Connection conB) throws SQLException, InterruptedException{
Statement stmt_source = conA.createStatement();
Set<String> tableNameSet=getTableName(conn_source);
Iterator<String> it = tableNameSet.iterator();
//遍歷表
while (it.hasNext()) {
long start = System.currentTimeMillis();
String str = it.next();
if(!clearTableData(str, conB)){
continue;
}
while(true){
if(InsertThread.getThreadCounts()>0){
Thread.sleep(3000);
}else{
break;
}
}
String sql_insert=getTableField(str, conA);
//獲取總條數 分頁查詢
String sql_count="select count(*) from "+ str;
ResultSet rs = stmt_source.executeQuery(sql_count);
rs.next();
int totalCount=rs.getInt(1);
if(totalCount>batchSize){
int max=totalCount%batchSize==0 ? totalCount/batchSize : totalCount/batchSize+1;
for(int i=0;i<max;i++){
synchronized (InsertThread.class) {
String sql_data="select * from "+str+" limit "+ i*batchSize + " , "+batchSize;
int tCount = InsertThread.getThreadCounts();
while (tCount >= max_thread_size) {
System.out.println("系統當前執行緒數為:" + tCount+ ",已達到最大執行緒數 "+max_thread_size+",請等待其他執行緒執行完畢並釋放系統資源");
InsertThread.class.wait();
tCount = InsertThread.getThreadCounts();
}
// 重新啟動一個子執行緒
Thread td = new InsertThread(sql_data, sql_insert, conB, conA);
td.start();
System.out.println("已建立新的子執行緒: " + td.getName());
}
}
}else{
String sql_data="select * from "+str;
Thread td = new InsertThread(sql_data, sql_insert, conB, conA);
td.start();
}
long end = System.currentTimeMillis();
System.out.println(str+" 表資料匯入完成,耗時:"+(end-start)/1000+"秒,"+(end-start)/60000+"分鐘");
}
}
}
執行緒控制類:
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
public class InsertThread extends Thread {
private String sql_data;
private String sql_insert;
private Connection conB;
private Connection conA;
static int batchSize = 2500;
// 執行緒計數器
static private int threadCounts;
// 執行緒名稱池
static private String threadNames[];
static {
// 假設這裡允許系統同時執行最大執行緒數為10個
int maxThreadCounts = 5;
threadNames = new String[maxThreadCounts];
// 初始化執行緒名稱池
for (int i = 1; i <= maxThreadCounts; i++) {
threadNames[i - 1] = "子執行緒_" + i;
}
}
public InsertThread() {
// 臨界資源鎖定
synchronized (InsertThread.class) {
// 執行緒總數加1
threadCounts++;
// 從執行緒名稱池中取出一個未使用的執行緒名
for (int i = 0; i < threadNames.length; i++) {
if (threadNames[i] != null) {
String temp = threadNames[i];
// 名被佔用後清空
threadNames[i] = null;
// 初始化執行緒名稱
this.setName(temp);
break;
}
}
}
}
public void run() {
try {
Long start = System.currentTimeMillis();
Statement stmt_source = conA.createStatement();
ResultSet rs_sql_data = stmt_source.executeQuery(sql_data);
ResultSetMetaData rsmd = rs_sql_data.getMetaData();
PreparedStatement ps = conB.prepareStatement(sql_insert);
int columnCount=rsmd.getColumnCount();
int count=1;
while (rs_sql_data.next()) {
count++;
for(int k=1;k<=columnCount;k++){
ps.setString(k, rs_sql_data.getString(k));
}
ps.addBatch();
if(count % batchSize == 0) {
ps.executeBatch();
conB.commit();
}
}
ps.executeBatch();
conB.commit();
Long end = System.currentTimeMillis();
System.out.println(this.getName()+",耗時:"+(end-start)/1000 + "秒");
stmt_source.close();
rs_sql_data.close();
ps.close();
} catch (Exception e) {
System.out.println(e);
} finally {
synchronized (InsertThread.class) {
// 釋放執行緒名稱
String[] threadName = this.getName().split("_");
// 執行緒名使用完後放入名稱池
threadNames[Integer.parseInt(threadName[1]) - 1] = this.getName();
// 執行緒執行完畢後減1
threadCounts--;
/*
* 通知其他被阻塞的執行緒,但如果其他執行緒要執行,則該同步塊一定要執行結束(即直
* 到釋放佔的鎖),其他執行緒才有機會執行,所以這裡的只是喚醒在此物件監視器上等待
* 的所有執行緒,讓他們從等待池中進入物件鎖池佇列中,而這些執行緒重新執行時它們一定
* 要先要得該鎖後才可能執行,這裡的notifyAll是不會釋放鎖的,試著把下面的睡眠語
* 句註釋去掉,即使你已呼叫了notify方法,發現CreateThread中的同步塊還是好
* 像一直處於物件等待狀態,其實呼叫notify方法後,CreateThread執行緒進入了物件鎖
* 池佇列中了,只要它一獲取到鎖,CreateThread所線上程就會真真的被喚醒並執行。
*/
InsertThread.class.notifyAll();
System.out.println("----" + this.getName() + " 所佔用資源釋放完畢,當前系統正在執行的子執行緒數:"
+ threadCounts);
}
}
}
static public int getThreadCounts() {
synchronized (InsertThread.class) {
return threadCounts;
}
}
public InsertThread(String sql_data, String sql_insert, Connection conB, Connection conA) {
super();
this.sql_data = sql_data;
this.sql_insert = sql_insert;
this.conB = conB;
this.conA = conA;
// 臨界資源鎖定
synchronized (InsertThread.class) {
// 執行緒總數加1
threadCounts++;
// 從執行緒名稱池中取出一個未使用的執行緒名
for (int i = 0; i < threadNames.length; i++) {
if (threadNames[i] != null) {
String temp = threadNames[i];
// 名被佔用後清空
threadNames[i] = null;
// 初始化執行緒名稱
this.setName(temp);
break;
}
}
}
}
}