1. 程式人生 > >利用JDBC連線實現跨伺服器跨資料庫跨表資料傳輸

利用JDBC連線實現跨伺服器跨資料庫跨表資料傳輸

  如題

 我現在有兩個伺服器 A和B 我現在要把A伺服器上的某一個庫裡面的所有的表及結構及資料 匯入另外一個伺服器上,實現原理利用原始JDBC 完成

程式碼類:


 回家再擼 目前在公司 。。 下班    哈哈 晚上吃了點飯 就回來晚了 繼續寫

偷笑


      首先JDBC連線類 


package com.gpdata.ic.usermanagement.admin.datasource.insert;

import java.sql.*;

/**
 * Created by qws on 2017/5/26/026.
*/ public class ConnectionDateBases { /** * 資源伺服器連線 **/ private static String targetUrl = UtilConfig.targetUrl; /** * 本地伺服器連線 **/ private static String nativeUrl = UtilConfig.nativeUrl; /** * 使用者名稱 */ private static String userName = "root"; /** * 密碼
*/ private static String password = "root"; /** * 連線 */ private static String driver = "com.mysql.jdbc.Driver"; public Connection getNativeConnection() { Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection
(nativeUrl, userName, password); } catch (Exception e) { e.printStackTrace(); } return conn; } public Connection getSourceConnection() { Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection(targetUrl, userName, password); } catch (Exception e) { e.printStackTrace(); } return conn; } /** * 釋放連線 * * @param conn */ private static void freeConnection(Connection conn) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放statement * * @param statement */ private static void freeStatement(Statement statement) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放statement * * @param statement */ private static void freePreStatement(PreparedStatement statement) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放resultset * * @param rs */ private static void freeResultSet(ResultSet rs) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param conn * @param statement * @param rs */ public static void free(Connection conn, PreparedStatement preparedStatement, Statement statement, ResultSet rs) { if (rs != null) { freeResultSet(rs); } if (preparedStatement != null) { freePreStatement(preparedStatement); } if (statement != null) { freeStatement(statement); } if (conn != null) { freeConnection(conn); } } }
 連線config

package com.gpdata.ic.usermanagement.admin.datasource.insert;

/**
 * Created by qws on 2017/5/27/027.
 */
public class UtilConfig {
    /**
     * 定義資料庫字首
     **/
    final static String pre_dataBase = "gp_clone";
    /**
     * 定義資料表字首
     **/
    final static String pre_tableName = "gp_";

    /**
     * 定義來源資料庫連結
     **/
    final static String targetUrl = "jdbc:mysql://127.0.0.1:3306/uuuu";

    /**
     * 定義本地資料庫連結
     **/
    final static String nativeUrl = "jdbc:mysql://127.0.0.1:3306/uuuu";
 
}   JDBC裡面的配置 是從這裡拿的

 Dao 層

 資料庫的一些操作 裡面有詳細描述

package com.gpdata.ic.usermanagement.admin.datasource.insert;


import com.gpdata.ic.usermanagement.admin.entity.Tablestructure;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by qws on 2017/5/26/026.
 */
public class SyDateDao {
    ConnectionDateBases dataBase = new ConnectionDateBases();

    /**
     * 獲得資料庫名->以後用到
     **/
    public List getDateBaseDao(Connection con) {
        PreparedStatement ptst = null;
        ResultSet rs = null;
        List<String> datename = new ArrayList(); // 資料庫名
        String sql = "show databases";
        try {
            ptst = con.prepareStatement(sql);
            rs = ptst.executeQuery();
            while (rs.next()) {
                datename.add(rs.getString("Database"));
            }
            dataBase.free(con, ptst, null, rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return datename;
    }

    /**
     * 生成資料庫名字
     **/
    public void createDataBase(Connection con, String sourcedataname) {
        PreparedStatement ptst = null;
        String sql = new SqlUtil().createDataBase(sourcedataname);
        try {
            ptst = con.prepareStatement(sql);
            ptst.execute();
            dataBase.free(null, ptst, null, null);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲得連線資源的某表
     **/
    public List getTableDao(Connection con, String dataname) {
        PreparedStatement ptst = null;
        ResultSet rs = null;
        List<String> Table = new ArrayList();
        String sql = "SELECT TABLE_NAME FROM information_schema.tables t WHERE t.table_schema = '" + dataname + "'";
        try {
            ptst = con.prepareStatement(sql);
            rs = ptst.executeQuery();
            while (rs.next()) {
                Table.add(rs.getString("TABLE_NAME"));
            }
            dataBase.free(null, ptst, null, rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return Table;
    }

    /**
     * 獲得資源庫某表的條數
     **/
    public List getTableCount(Connection con, String table, String fromDate, String toDate) {
        PreparedStatement ptst = null;
        ResultSet rs;
        List count = new ArrayList(); //數量
        try {
            String sql = "select count(1) from  " + table;
            if (fromDate != null && toDate != null) {
                sql = "select count(1) from  " + table + " where createtime > ' " + fromDate + " ' and  createtime < ' " + toDate + " ' ";
            }
            ptst = con.prepareStatement(sql);
            rs = ptst.executeQuery();
            while (rs.next()) {
                count.add(rs.getObject("count(1)"));
            }
            dataBase.free(null, ptst, null, rs);
        } catch (SQLException e) {
            return null;
        }

        return count;
    }

    /**
     * 查詢某表所有欄位
     **/
    public List Find_table_field(Connection con, String table) {
        PreparedStatement ptst = null;
        ResultSet rs;
        List field = new ArrayList(); //欄位
        String sql = "desc " + table;
        try {
            ptst = con.prepareStatement(sql);
            rs = ptst.executeQuery();
            while (rs.next()) {
                field.add(rs.getObject("field"));
            }
            dataBase.free(null, ptst, null, rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return field;
    }

    /**
     * 增加接入時間的欄位
     **/
    public void alertTime(Connection con, String tableName) {
        PreparedStatement ptst = null;
        String sql = new SqlUtil().alertSql(tableName);
        try {
            ptst = con.prepareStatement(sql);
            ptst.executeUpdate();
            dataBase.free(null, ptst, null, null);
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }

    /**
     * 全量清表
     **/
    public void trunTable(Connection con, String tableName) {
        PreparedStatement ptst = null;
        String sql = "TRUNCATE " + tableName;
        try {
            ptst = con.prepareStatement(sql);
            ptst.execute();
            dataBase.free(null, ptst, null, null);
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }

    /**
     * 同步表資料
     **/
    public void launchSyncData(Connection coreConnection, Connection targetConn, String tableName, String fromdate, String todate, String nativeSource) {
        if (fromdate == null || todate == null) {
            this.trunTable(targetConn, UtilConfig.pre_tableName + tableName);
        }
        try {
            Statement coreStmt = coreConnection.createStatement();
            //本地
            List<String> field = new SyDateDao().Find_table_field(targetConn, nativeSource + "." + UtilConfig.pre_tableName + tableName);
            //有時間限制
            List list = new SyDateDao().getTableCount(coreConnection, tableName, fromdate, todate);
            int size = Integer.parseInt(list.get(0).toString());
            int a = field.size(); //19
            int b = a;
            int c = 1;
            //預設全量pre語句
            String preSql =  new SqlUtil().preSql(tableName);
            if (fromdate != null && todate != null) {
                //增量
                preSql = new SqlUtil().addPreSql(tableName);
            }
            PreparedStatement targetPstmt = targetConn.prepareStatement(preSql);
            //批處理 10 條處理
            int page = size / 1000;
            for (int i = 0; i < page + 1; i++) {
                int size2 = i * 1000;
                String seleSql = "select *  from " + tableName + " limit " + size2 + ",1000 ";
                if (fromdate != null && todate != null) {
                    seleSql = "select *  from " + tableName + " where createtime > ' " + fromdate + " '  and  createtime <  ' " + todate + " '  limit " + size2 + ",1000";
                }
                ResultSet coreRs = coreStmt.executeQuery(seleSql);
                while (coreRs.next()) {
                    targetPstmt.setObject(1, null);
                    while (++c < b) {
                        targetPstmt.setObject(c, coreRs.getObject(c));
                    }
                    c = 1;
                    targetPstmt.execute();
                }
                coreRs.close();
            }
            coreStmt.close();
            targetPstmt.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }


    /**
     * by ysh 查詢某庫下的表所有欄位
     **/
    public List<Tablestructure> listsql(Connection con, String databasename, String tablename) {
        PreparedStatement ptst = null;
        ResultSet rs;
        List<Tablestructure> field = new ArrayList(); //欄位
        final String sql = "describe " + databasename + "." + tablename + "";
        try {
            ptst = con.prepareStatement(sql);
            rs = ptst.executeQuery();
            while (rs.next()) {
                Tablestructure tablestructure = new Tablestructure();
                tablestructure.setField(rs.getString("Field"));
                tablestructure.setType(rs.getString("type"));
                tablestructure.setNull(rs.getString("Null"));
                tablestructure.setKey(rs.getString("key"));
                tablestructure.setDefault(rs.getString("Default"));
                tablestructure.setExtra(rs.getString("Extra"));
                field.add(tablestructure);
            }
            dataBase.free(null, ptst, null, rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return field;
    }



    /**
     * 切換使用者進行 建立表結構
     * **/

    public void moveUse(Connection source,Connection con, String sourcebaseName,String tableName) {
        PreparedStatement move_ptst = null;
        PreparedStatement drop_ptst = null;
        PreparedStatement table_ptst = null;
        String  move_sql = new SqlUtil().useSQL( UtilConfig.pre_dataBase+"_"+sourcebaseName);
        String drop_sql=new SqlUtil().Tableexist(UtilConfig.pre_tableName+tableName);
        String create_sql=new SqlUtil().allSql(source,sourcebaseName,tableName);
        try {
            move_ptst = con.prepareStatement(move_sql);
            move_ptst.execute();
            drop_ptst = con.prepareStatement(drop_sql);
            drop_ptst.execute();
            table_ptst = con.prepareStatement(create_sql);
            table_ptst.execute();
            dataBase.free(null, table_ptst, null, null);
            drop_ptst.close();
            move_ptst.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }
}

  資料用的批處理 1000一處理,, 畢竟不會執行緒 。。。 尷尬

  拼接SQL的類

package com.gpdata.ic.usermanagement.admin.datasource.insert;


import com.gpdata.ic.usermanagement.admin.entity.Tablestructure;
import java.sql.Connection;
import java.util.List;

/**
 * Created by qws on 2017/5/26/026.
 */
public class SqlUtil {
    //本地資源庫
    Connection natiCon = new ConnectionDateBases().getNativeConnection();
    //對方資源庫
    Connection sourceCon = new ConnectionDateBases().getSourceConnection();

    /**
     * 生成增加接入時間的列名
     **/
    public String alertSql(String tableName) {
        StringBuffer sb = new StringBuffer();
        sb.append("ALTER table " + tableName + " add     gp_updatetime timestamp null");
        return sb.toString();
    }

    /**
     * 全量同步
     * 本地結構跟源結構一致,查詢本地結構加欄位
     * 生成預處理資料庫插入語句
     **/

    public String preSql(String tableName) {
        String natiTable = UtilConfig.pre_tableName + tableName;
        List list = new SyDateDao().Find_table_field(sourceCon, tableName);
        StringBuffer sb = new StringBuffer();
        sb.append("  REPLACE INTO " + natiTable + "(");
        for (int i = 0; i < list.size(); i++) {
            sb.append(list.get(i) + ",");
        }
        sb.append("gp_updatetime )values(");
        for (int i = 0; i < list.size(); i++) {
            sb.append("?,");
        }
        sb.append("CURRENT_TIMESTAMP )");


        return sb.toString();
    }


    /**
     * 增量同步
     * 本地結構跟源結構一致,查詢本地結構加欄位
     * 生成預處理資料庫插入語句
     **/

    public String addPreSql(String tableName) {
        String natiTable = UtilConfig.pre_tableName + tableName;
        List list = new SyDateDao().Find_table_field(sourceCon, tableName);
        StringBuffer sb = new StringBuffer();
        sb.append("INSERT INTO " + natiTable + "(");
        for (int i = 0; i < list.size(); i++) {
            sb.append(list.get(i) + ",");
        }
        sb.append("gp_updatetime )values(");
        for (int i = 0; i < list.size(); i++) {
            sb.append("?,");
        }
        sb.append("CURRENT_TIMESTAMP )");
        return sb.toString();
    }

    /**
     * 拿到另一個庫中額中的表字段 by ysh
     **/

    public String allSql(Connection con, String databasename, String tablename) {

        List<Tablestructure> list = new SyDateDao().listsql(con, databasename, tablename);
        String alllsitsql = "";
        for (Tablestructure tablestructure : list) {
            alllsitsql += "`" + tablestructure.getField() + "`" + "  " + tablestructure.getType() + " ";
            if (tablestructure.getNull().equals("NO")) {
                alllsitsql += "NOT NULL" + " ";
            }
            if (tablestructure.getExtra().equals("auto_increment") && tablestructure.getKey().equals("PRI")) {
                alllsitsql += "AUTO_INCREMENT" + ",";
            }
            if (tablestructure.getDefault() == null && tablestructure.getNull().trim().equals("YES")) {
                alllsitsql += "DEFAULT NULL " + " ";
            }
            if (tablestructure.getKey().trim().contains("PRI")) {
                alllsitsql += " PRIMARY KEY (`" + tablestructure.getField() + "`)";
            }
            alllsitsql += ",";

        }
        String allString = alllsitsql.substring(0, alllsitsql.length() - 1);
        String sql2 = "CREATE TABLE "+ UtilConfig.pre_tableName+tablename + " (" + allString + " ) DEFAULT CHARSET=utf8";
        return sql2;
    }

    /**
     * 建立資料庫 字首一gp_clone命名
     **/
    public String createDataBase(String dataBaseName) {
        return "create   database  " + UtilConfig.pre_dataBase + "_" + dataBaseName;
    }

    /**
     * 驗證表是否存在 by ysh
     **/
    public String Tableexist( String tablename) {
        String sql1 = " DROP TABLE IF EXISTS " + tablename + ";";
        return sql1;
    }

    public String useSQL(String nativeName) {
        return "use " + nativeName + " ; ";
    }

}

這個  拼接表結構的實體類。。。。 寫的不好 哈哈  不是我寫的 (*^__^*) 嘻嘻……

package com.gpdata.ic.usermanagement.admin.entity;

/**
 * Created by yushuanghong on 2017/5/27.
 */
public class Tablestructure {

    private String  Field;
    private  String type;
    private String Null;
    private String key;
    private String Default;
    private String Extra;

    public String getField() {
        return Field;
    }

    public void setField(String field) {
        Field = field;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getNull() {
        return Null;
    }

    public void setNull(String aNull) {
        Null = aNull;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getDefault() {
        return Default;
    }

    public void setDefault(String aDefault) {
        Default = aDefault;
    }

    public String getExtra() {
        return Extra;
    }

    public void setExtra(String extra) {
        Extra = extra;
    }
}
  測試類、、 

package com.gpdata.ic.usermanagement.admin.datasource.insert;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
 * Created by qws on 2017/5/26/026.
 */
public class ExecuteSql {


    /**
     * 同步全部資料
     *
     * @param fromdate       :大於這個時間
     * @param todate         :小於這個時間
     * @param sourcedataName :資源資料庫名稱
     *
     **/
    public int executeSql(String fromdate, String todate, String sourcedataName) {
        //本地資料庫名字
        String NativeDataBase= UtilConfig.pre_dataBase+"_"+sourcedataName;
        Connection natiCon = new ConnectionDateBases().getNativeConnection();
        Connection sourceCon = new ConnectionDateBases().getSourceConnection();
        SyDateDao dao = new SyDateDao();
        List nameList = dao.getTableDao(sourceCon, sourcedataName);
        //建立資料庫
         new SyDateDao().createDataBase(natiCon,sourcedataName);
        for (int i = 0; i < nameList.size(); i++) {
            String tableName = nameList.get(i).toString();
            //建立表結構
             new SyDateDao().moveUse(sourceCon,natiCon,sourcedataName,tableName);
            //多加一個時間欄位
             dao.alertTime(natiCon,  UtilConfig.pre_tableName+tableName);
            //同步資料
           dao.launchSyncData(sourceCon, natiCon, tableName, fromdate, todate,NativeDataBase);
         }
        if (sourceCon != null) {
            try {
                sourceCon.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (natiCon != null) {
            try {
                natiCon.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        return 0;
    }





    public static void main(String[] args) {
        String fromdate = "2017-05-04 14:34:18";
        String todate = "2017-05-27 10:16:44";
        new ExecuteSql().executeSql(fromdate, todate, "uuuu");
    }


}
  噢哦  對了還有表結構 我這個表結構 以及同步資料 是根據特定的表結構寫的。。。 什麼主鍵只有ID 沒有主外來鍵關聯啊。。。 還有主鍵自增啊  資料庫結構比較簡單些的  具體複雜的話 可以再改裡面同步資料的方法。。。 都是java基礎知識寫的。。

  表結構

/*
Navicat MySQL Data Transfer


Source Server         : localhost_3306
Source Server Version : 50022
Source Host           : 127.0.0.1:3306
Source Database       : uuuu


Target Server Type    : MYSQL
Target Server Version : 50022
File Encoding         : 65001


Date: 2017-05-27 21:50:30
*/


SET FOREIGN_KEY_CHECKS=0;


-- ----------------------------
-- Table structure for data_connect
-- ----------------------------
DROP TABLE IF EXISTS `data_connect`;
CREATE TABLE `data_connect` (
  `id` bigint(50) NOT NULL auto_increment COMMENT '接入列表id',
  `conDBName` varchar(500) default NULL COMMENT '接入資料庫',
  `conTabName` varchar(500) default NULL COMMENT '表名',
  `numIncre` bigint(255) default NULL COMMENT '新增資料量',
  `numAll` bigint(255) default NULL COMMENT '總資料量',
  `conType` varchar(500) default NULL COMMENT '類別',
  `conStatus` bigint(50) default NULL COMMENT '接入狀態0:暫停  1:正常接入',
  `remark` longtext COMMENT '備註',
  `userID` varchar(500) default NULL COMMENT '操作人ID',
  `userName` varchar(255) default NULL COMMENT '操作人名字',
  `taskStart` datetime default NULL COMMENT '任務起始週期',
  `taskEnd` datetime default NULL COMMENT '任務結束週期',
  `taskCycle` varchar(50) default NULL COMMENT '執行週期',
  `taskTime` datetime default NULL COMMENT '執行時間',
  `taskLocal` varchar(500) default NULL COMMENT '任務位置',
  `taskId` bigint(50) default NULL COMMENT '對應的任務ID',
  `createTime` datetime default NULL COMMENT '建立時間',
  `updateTime` datetime default NULL COMMENT '修改時間',
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- ----------------------------
-- Table structure for data_connect_copy
-- ----------------------------
DROP TABLE IF EXISTS `data_connect_copy`;
CREATE TABLE `data_connect_copy` (
  `id` bigint(50) NOT NULL auto_increment COMMENT '接入列表id',
  `conDBName` varchar(500) default NULL COMMENT '接入資料庫',
  `conTabName` varchar(500) default NULL COMMENT '表名',
  `numIncre` bigint(255) default NULL COMMENT '新增資料量',
  `numAll` bigint(255) default NULL COMMENT '總資料量',
  `conType` varchar(500) default NULL COMMENT '類別',
  `conStatus` bigint(50) default NULL COMMENT '接入狀態0:暫停  1:正常接入',
  `remark` longtext COMMENT '備註',
  `userID` varchar(500) default NULL COMMENT '操作人ID',
  `userName` varchar(255) default NULL COMMENT '操作人名字',
  `taskStart` datetime default NULL COMMENT '任務起始週期',
  `taskEnd` datetime default NULL COMMENT '任務結束週期',
  `taskCycle` varchar(50) default NULL COMMENT '執行週期',
  `taskTime` datetime default NULL COMMENT '執行時間',
  `taskLocal` varchar(500) default NULL COMMENT '任務位置',
  `taskId` bigint(50) default NULL COMMENT '對應的任務ID',
  `createTime` datetime default NULL COMMENT '建立時間',
  `updateTime` datetime default NULL COMMENT '修改時間',
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

具體原始碼就這樣了。