利用JDBC連線實現跨伺服器跨資料庫跨表資料傳輸
如題
我現在有兩個伺服器 A和B 我現在要把A伺服器上的某一個庫裡面的所有的表及結構及資料 匯入另外一個伺服器上,實現原理利用原始JDBC 完成
程式碼類:
回家再擼 目前在公司 。。 下班 哈哈 晚上吃了點飯 就回來晚了 繼續寫
首先JDBC連線類
package com.gpdata.ic.usermanagement.admin.datasource.insert; import java.sql.*; /** * Created by qws on 2017/5/26/026.連線config*/ public class ConnectionDateBases { /** * 資源伺服器連線 **/ private static String targetUrl = UtilConfig.targetUrl; /** * 本地伺服器連線 **/ private static String nativeUrl = UtilConfig.nativeUrl; /** * 使用者名稱 */ private static String userName = "root"; /** * 密碼*/ private static String password = "root"; /** * 連線 */ private static String driver = "com.mysql.jdbc.Driver"; public Connection getNativeConnection() { Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection(nativeUrl, userName, password); } catch (Exception e) { e.printStackTrace(); } return conn; } public Connection getSourceConnection() { Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection(targetUrl, userName, password); } catch (Exception e) { e.printStackTrace(); } return conn; } /** * 釋放連線 * * @param conn */ private static void freeConnection(Connection conn) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放statement * * @param statement */ private static void freeStatement(Statement statement) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放statement * * @param statement */ private static void freePreStatement(PreparedStatement statement) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放resultset * * @param rs */ private static void freeResultSet(ResultSet rs) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param conn * @param statement * @param rs */ public static void free(Connection conn, PreparedStatement preparedStatement, Statement statement, ResultSet rs) { if (rs != null) { freeResultSet(rs); } if (preparedStatement != null) { freePreStatement(preparedStatement); } if (statement != null) { freeStatement(statement); } if (conn != null) { freeConnection(conn); } } }
package com.gpdata.ic.usermanagement.admin.datasource.insert; /** * Created by qws on 2017/5/27/027. */ public class UtilConfig { /** * 定義資料庫字首 **/ final static String pre_dataBase = "gp_clone"; /** * 定義資料表字首 **/ final static String pre_tableName = "gp_"; /** * 定義來源資料庫連結 **/ final static String targetUrl = "jdbc:mysql://127.0.0.1:3306/uuuu"; /** * 定義本地資料庫連結 **/ final static String nativeUrl = "jdbc:mysql://127.0.0.1:3306/uuuu";
} JDBC裡面的配置 是從這裡拿的
Dao 層
資料庫的一些操作 裡面有詳細描述
package com.gpdata.ic.usermanagement.admin.datasource.insert; import com.gpdata.ic.usermanagement.admin.entity.Tablestructure; import java.sql.*; import java.util.ArrayList; import java.util.List; /** * Created by qws on 2017/5/26/026. */ public class SyDateDao { ConnectionDateBases dataBase = new ConnectionDateBases(); /** * 獲得資料庫名->以後用到 **/ public List getDateBaseDao(Connection con) { PreparedStatement ptst = null; ResultSet rs = null; List<String> datename = new ArrayList(); // 資料庫名 String sql = "show databases"; try { ptst = con.prepareStatement(sql); rs = ptst.executeQuery(); while (rs.next()) { datename.add(rs.getString("Database")); } dataBase.free(con, ptst, null, rs); } catch (SQLException e) { e.printStackTrace(); } return datename; } /** * 生成資料庫名字 **/ public void createDataBase(Connection con, String sourcedataname) { PreparedStatement ptst = null; String sql = new SqlUtil().createDataBase(sourcedataname); try { ptst = con.prepareStatement(sql); ptst.execute(); dataBase.free(null, ptst, null, null); } catch (SQLException e) { e.printStackTrace(); } } /** * 獲得連線資源的某表 **/ public List getTableDao(Connection con, String dataname) { PreparedStatement ptst = null; ResultSet rs = null; List<String> Table = new ArrayList(); String sql = "SELECT TABLE_NAME FROM information_schema.tables t WHERE t.table_schema = '" + dataname + "'"; try { ptst = con.prepareStatement(sql); rs = ptst.executeQuery(); while (rs.next()) { Table.add(rs.getString("TABLE_NAME")); } dataBase.free(null, ptst, null, rs); } catch (SQLException e) { e.printStackTrace(); } return Table; } /** * 獲得資源庫某表的條數 **/ public List getTableCount(Connection con, String table, String fromDate, String toDate) { PreparedStatement ptst = null; ResultSet rs; List count = new ArrayList(); //數量 try { String sql = "select count(1) from " + table; if (fromDate != null && toDate != null) { sql = "select count(1) from " + table + " where createtime > ' " + fromDate + " ' and createtime < ' " + toDate + " ' "; } ptst = con.prepareStatement(sql); rs = ptst.executeQuery(); while (rs.next()) { count.add(rs.getObject("count(1)")); } dataBase.free(null, ptst, null, rs); } catch (SQLException e) { return null; } return count; } /** * 查詢某表所有欄位 **/ public List Find_table_field(Connection con, String table) { PreparedStatement ptst = null; ResultSet rs; List field = new ArrayList(); //欄位 String sql = "desc " + table; try { ptst = con.prepareStatement(sql); rs = ptst.executeQuery(); while (rs.next()) { field.add(rs.getObject("field")); } dataBase.free(null, ptst, null, rs); } catch (SQLException e) { e.printStackTrace(); } return field; } /** * 增加接入時間的欄位 **/ public void alertTime(Connection con, String tableName) { PreparedStatement ptst = null; String sql = new SqlUtil().alertSql(tableName); try { ptst = con.prepareStatement(sql); ptst.executeUpdate(); dataBase.free(null, ptst, null, null); } catch (SQLException e) { e.printStackTrace(); } } /** * 全量清表 **/ public void trunTable(Connection con, String tableName) { PreparedStatement ptst = null; String sql = "TRUNCATE " + tableName; try { ptst = con.prepareStatement(sql); ptst.execute(); dataBase.free(null, ptst, null, null); } catch (SQLException e) { e.printStackTrace(); } } /** * 同步表資料 **/ public void launchSyncData(Connection coreConnection, Connection targetConn, String tableName, String fromdate, String todate, String nativeSource) { if (fromdate == null || todate == null) { this.trunTable(targetConn, UtilConfig.pre_tableName + tableName); } try { Statement coreStmt = coreConnection.createStatement(); //本地 List<String> field = new SyDateDao().Find_table_field(targetConn, nativeSource + "." + UtilConfig.pre_tableName + tableName); //有時間限制 List list = new SyDateDao().getTableCount(coreConnection, tableName, fromdate, todate); int size = Integer.parseInt(list.get(0).toString()); int a = field.size(); //19 int b = a; int c = 1; //預設全量pre語句 String preSql = new SqlUtil().preSql(tableName); if (fromdate != null && todate != null) { //增量 preSql = new SqlUtil().addPreSql(tableName); } PreparedStatement targetPstmt = targetConn.prepareStatement(preSql); //批處理 10 條處理 int page = size / 1000; for (int i = 0; i < page + 1; i++) { int size2 = i * 1000; String seleSql = "select * from " + tableName + " limit " + size2 + ",1000 "; if (fromdate != null && todate != null) { seleSql = "select * from " + tableName + " where createtime > ' " + fromdate + " ' and createtime < ' " + todate + " ' limit " + size2 + ",1000"; } ResultSet coreRs = coreStmt.executeQuery(seleSql); while (coreRs.next()) { targetPstmt.setObject(1, null); while (++c < b) { targetPstmt.setObject(c, coreRs.getObject(c)); } c = 1; targetPstmt.execute(); } coreRs.close(); } coreStmt.close(); targetPstmt.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * by ysh 查詢某庫下的表所有欄位 **/ public List<Tablestructure> listsql(Connection con, String databasename, String tablename) { PreparedStatement ptst = null; ResultSet rs; List<Tablestructure> field = new ArrayList(); //欄位 final String sql = "describe " + databasename + "." + tablename + ""; try { ptst = con.prepareStatement(sql); rs = ptst.executeQuery(); while (rs.next()) { Tablestructure tablestructure = new Tablestructure(); tablestructure.setField(rs.getString("Field")); tablestructure.setType(rs.getString("type")); tablestructure.setNull(rs.getString("Null")); tablestructure.setKey(rs.getString("key")); tablestructure.setDefault(rs.getString("Default")); tablestructure.setExtra(rs.getString("Extra")); field.add(tablestructure); } dataBase.free(null, ptst, null, rs); } catch (SQLException e) { e.printStackTrace(); } return field; } /** * 切換使用者進行 建立表結構 * **/ public void moveUse(Connection source,Connection con, String sourcebaseName,String tableName) { PreparedStatement move_ptst = null; PreparedStatement drop_ptst = null; PreparedStatement table_ptst = null; String move_sql = new SqlUtil().useSQL( UtilConfig.pre_dataBase+"_"+sourcebaseName); String drop_sql=new SqlUtil().Tableexist(UtilConfig.pre_tableName+tableName); String create_sql=new SqlUtil().allSql(source,sourcebaseName,tableName); try { move_ptst = con.prepareStatement(move_sql); move_ptst.execute(); drop_ptst = con.prepareStatement(drop_sql); drop_ptst.execute(); table_ptst = con.prepareStatement(create_sql); table_ptst.execute(); dataBase.free(null, table_ptst, null, null); drop_ptst.close(); move_ptst.close(); } catch (SQLException e) { e.printStackTrace(); } } }
資料用的批處理 1000一處理,, 畢竟不會執行緒 。。。 尷尬
拼接SQL的類
package com.gpdata.ic.usermanagement.admin.datasource.insert; import com.gpdata.ic.usermanagement.admin.entity.Tablestructure; import java.sql.Connection; import java.util.List; /** * Created by qws on 2017/5/26/026. */ public class SqlUtil { //本地資源庫 Connection natiCon = new ConnectionDateBases().getNativeConnection(); //對方資源庫 Connection sourceCon = new ConnectionDateBases().getSourceConnection(); /** * 生成增加接入時間的列名 **/ public String alertSql(String tableName) { StringBuffer sb = new StringBuffer(); sb.append("ALTER table " + tableName + " add gp_updatetime timestamp null"); return sb.toString(); } /** * 全量同步 * 本地結構跟源結構一致,查詢本地結構加欄位 * 生成預處理資料庫插入語句 **/ public String preSql(String tableName) { String natiTable = UtilConfig.pre_tableName + tableName; List list = new SyDateDao().Find_table_field(sourceCon, tableName); StringBuffer sb = new StringBuffer(); sb.append(" REPLACE INTO " + natiTable + "("); for (int i = 0; i < list.size(); i++) { sb.append(list.get(i) + ","); } sb.append("gp_updatetime )values("); for (int i = 0; i < list.size(); i++) { sb.append("?,"); } sb.append("CURRENT_TIMESTAMP )"); return sb.toString(); } /** * 增量同步 * 本地結構跟源結構一致,查詢本地結構加欄位 * 生成預處理資料庫插入語句 **/ public String addPreSql(String tableName) { String natiTable = UtilConfig.pre_tableName + tableName; List list = new SyDateDao().Find_table_field(sourceCon, tableName); StringBuffer sb = new StringBuffer(); sb.append("INSERT INTO " + natiTable + "("); for (int i = 0; i < list.size(); i++) { sb.append(list.get(i) + ","); } sb.append("gp_updatetime )values("); for (int i = 0; i < list.size(); i++) { sb.append("?,"); } sb.append("CURRENT_TIMESTAMP )"); return sb.toString(); } /** * 拿到另一個庫中額中的表字段 by ysh **/ public String allSql(Connection con, String databasename, String tablename) { List<Tablestructure> list = new SyDateDao().listsql(con, databasename, tablename); String alllsitsql = ""; for (Tablestructure tablestructure : list) { alllsitsql += "`" + tablestructure.getField() + "`" + " " + tablestructure.getType() + " "; if (tablestructure.getNull().equals("NO")) { alllsitsql += "NOT NULL" + " "; } if (tablestructure.getExtra().equals("auto_increment") && tablestructure.getKey().equals("PRI")) { alllsitsql += "AUTO_INCREMENT" + ","; } if (tablestructure.getDefault() == null && tablestructure.getNull().trim().equals("YES")) { alllsitsql += "DEFAULT NULL " + " "; } if (tablestructure.getKey().trim().contains("PRI")) { alllsitsql += " PRIMARY KEY (`" + tablestructure.getField() + "`)"; } alllsitsql += ","; } String allString = alllsitsql.substring(0, alllsitsql.length() - 1); String sql2 = "CREATE TABLE "+ UtilConfig.pre_tableName+tablename + " (" + allString + " ) DEFAULT CHARSET=utf8"; return sql2; } /** * 建立資料庫 字首一gp_clone命名 **/ public String createDataBase(String dataBaseName) { return "create database " + UtilConfig.pre_dataBase + "_" + dataBaseName; } /** * 驗證表是否存在 by ysh **/ public String Tableexist( String tablename) { String sql1 = " DROP TABLE IF EXISTS " + tablename + ";"; return sql1; } public String useSQL(String nativeName) { return "use " + nativeName + " ; "; } }
這個 拼接表結構的實體類。。。。 寫的不好 哈哈 不是我寫的 (*^__^*) 嘻嘻……
package com.gpdata.ic.usermanagement.admin.entity; /** * Created by yushuanghong on 2017/5/27. */ public class Tablestructure { private String Field; private String type; private String Null; private String key; private String Default; private String Extra; public String getField() { return Field; } public void setField(String field) { Field = field; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getNull() { return Null; } public void setNull(String aNull) { Null = aNull; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getDefault() { return Default; } public void setDefault(String aDefault) { Default = aDefault; } public String getExtra() { return Extra; } public void setExtra(String extra) { Extra = extra; } }測試類、、
package com.gpdata.ic.usermanagement.admin.datasource.insert; import java.sql.Connection; import java.sql.SQLException; import java.util.List; /** * Created by qws on 2017/5/26/026. */ public class ExecuteSql { /** * 同步全部資料 * * @param fromdate :大於這個時間 * @param todate :小於這個時間 * @param sourcedataName :資源資料庫名稱 * **/ public int executeSql(String fromdate, String todate, String sourcedataName) { //本地資料庫名字 String NativeDataBase= UtilConfig.pre_dataBase+"_"+sourcedataName; Connection natiCon = new ConnectionDateBases().getNativeConnection(); Connection sourceCon = new ConnectionDateBases().getSourceConnection(); SyDateDao dao = new SyDateDao(); List nameList = dao.getTableDao(sourceCon, sourcedataName); //建立資料庫 new SyDateDao().createDataBase(natiCon,sourcedataName); for (int i = 0; i < nameList.size(); i++) { String tableName = nameList.get(i).toString(); //建立表結構 new SyDateDao().moveUse(sourceCon,natiCon,sourcedataName,tableName); //多加一個時間欄位 dao.alertTime(natiCon, UtilConfig.pre_tableName+tableName); //同步資料 dao.launchSyncData(sourceCon, natiCon, tableName, fromdate, todate,NativeDataBase); } if (sourceCon != null) { try { sourceCon.close(); } catch (SQLException e) { e.printStackTrace(); } } if (natiCon != null) { try { natiCon.close(); } catch (SQLException e) { e.printStackTrace(); } } return 0; } public static void main(String[] args) { String fromdate = "2017-05-04 14:34:18"; String todate = "2017-05-27 10:16:44"; new ExecuteSql().executeSql(fromdate, todate, "uuuu"); } }噢哦 對了還有表結構 我這個表結構 以及同步資料 是根據特定的表結構寫的。。。 什麼主鍵只有ID 沒有主外來鍵關聯啊。。。 還有主鍵自增啊 資料庫結構比較簡單些的 具體複雜的話 可以再改裡面同步資料的方法。。。 都是java基礎知識寫的。。
表結構
/*
Navicat MySQL Data Transfer
Source Server : localhost_3306
Source Server Version : 50022
Source Host : 127.0.0.1:3306
Source Database : uuuu
Target Server Type : MYSQL
Target Server Version : 50022
File Encoding : 65001
Date: 2017-05-27 21:50:30
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for data_connect
-- ----------------------------
DROP TABLE IF EXISTS `data_connect`;
CREATE TABLE `data_connect` (
`id` bigint(50) NOT NULL auto_increment COMMENT '接入列表id',
`conDBName` varchar(500) default NULL COMMENT '接入資料庫',
`conTabName` varchar(500) default NULL COMMENT '表名',
`numIncre` bigint(255) default NULL COMMENT '新增資料量',
`numAll` bigint(255) default NULL COMMENT '總資料量',
`conType` varchar(500) default NULL COMMENT '類別',
`conStatus` bigint(50) default NULL COMMENT '接入狀態0:暫停 1:正常接入',
`remark` longtext COMMENT '備註',
`userID` varchar(500) default NULL COMMENT '操作人ID',
`userName` varchar(255) default NULL COMMENT '操作人名字',
`taskStart` datetime default NULL COMMENT '任務起始週期',
`taskEnd` datetime default NULL COMMENT '任務結束週期',
`taskCycle` varchar(50) default NULL COMMENT '執行週期',
`taskTime` datetime default NULL COMMENT '執行時間',
`taskLocal` varchar(500) default NULL COMMENT '任務位置',
`taskId` bigint(50) default NULL COMMENT '對應的任務ID',
`createTime` datetime default NULL COMMENT '建立時間',
`updateTime` datetime default NULL COMMENT '修改時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for data_connect_copy
-- ----------------------------
DROP TABLE IF EXISTS `data_connect_copy`;
CREATE TABLE `data_connect_copy` (
`id` bigint(50) NOT NULL auto_increment COMMENT '接入列表id',
`conDBName` varchar(500) default NULL COMMENT '接入資料庫',
`conTabName` varchar(500) default NULL COMMENT '表名',
`numIncre` bigint(255) default NULL COMMENT '新增資料量',
`numAll` bigint(255) default NULL COMMENT '總資料量',
`conType` varchar(500) default NULL COMMENT '類別',
`conStatus` bigint(50) default NULL COMMENT '接入狀態0:暫停 1:正常接入',
`remark` longtext COMMENT '備註',
`userID` varchar(500) default NULL COMMENT '操作人ID',
`userName` varchar(255) default NULL COMMENT '操作人名字',
`taskStart` datetime default NULL COMMENT '任務起始週期',
`taskEnd` datetime default NULL COMMENT '任務結束週期',
`taskCycle` varchar(50) default NULL COMMENT '執行週期',
`taskTime` datetime default NULL COMMENT '執行時間',
`taskLocal` varchar(500) default NULL COMMENT '任務位置',
`taskId` bigint(50) default NULL COMMENT '對應的任務ID',
`createTime` datetime default NULL COMMENT '建立時間',
`updateTime` datetime default NULL COMMENT '修改時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
具體原始碼就這樣了。