1. 程式人生 > >使用mycat/otter基於binlog進行mysql擴充套件資料同步

使用mycat/otter基於binlog進行mysql擴充套件資料同步

目的:將資料庫DBA中的a表,實時同步到DBB庫中的b表,b表與a表相比多了一個新的欄位進行區分;

部署:

部署zk;

新增zk/node/canal到manager控制;

新增資料來源、資料表管理

新增channel/pipeline/對映關係


eventProcessor:

 package com.alibaba.otter.node.extend.processor;
import java.sql.Types;
import java.util.List;


import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
/**
 * 業務自定義處理過程
 *
 * @author jianghang 2012-6-25 下午02:26:36
 * @version 4.1.0
 */
public class TestTranfer extends AbstractEventProcessor {


    /**
     * 自定義處理單條EventData物件
     *
     * @return {@link EventData} 返回值=null,需要忽略該條資料
     */
    public boolean process(EventData eventData){
             EventType et = eventData.getEventType();
        if(et == EventType.INSERT){
            insert(eventData);
        }
        
        if(et == EventType.UPDATE || et == EventType.DELETE){
            EventColumn misc = new EventColumn();
            misc.setColumnValue("122");
            misc.setColumnType(Types.INTEGER);
            misc.setColumnName("addli");
            List<EventColumn> ks = eventData.getKeys();
            ks.add(misc);
        }
        return true;
    }


    private void printList(List<EventColumn> list){
        for(EventColumn t:list){
            System.out.println(t.getColumnName() +" = " + t.getColumnValue());
        }
    }
    
    private void insert(EventData eventData) {
        EventColumn misc = new EventColumn();
        misc.setColumnValue("122");
        misc.setColumnType(Types.INTEGER);
        misc.setColumnName("addli");
        misc.setUpdate(true);
        List<EventColumn> cols = eventData.getUpdatedColumns();
        cols.add(misc);
        eventData.setColumns(cols);
    }
} 

測試用資料庫指令碼:

CREATE DATABASE /*!32312 IF NOT EXISTS*/`mycat_test` /*!40100 DEFAULT CHARACTER SET utf8 */;  
  
USE `mycat_test`;  
  
/*Table structure for table `sam_test` */  
  
DROP TABLE IF EXISTS `sam_test`;  
  
CREATE TABLE `sam_test` (  
  `id_` int(4) NOT NULL AUTO_INCREMENT,  
  `name_` varchar(64) NOT NULL,  
  `user_id` bigint(4) NOT NULL,  
  PRIMARY KEY (`id_`)  
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;  
CREATE DATABASE /*!32312 IF NOT EXISTS*/`mycat_test1` /*!40100 DEFAULT CHARACTER SET utf8 */;  
  
USE `mycat_test1`;  
  
/*Table structure for table `sam_test1` */  
  
DROP TABLE IF EXISTS `sam_test1`;  
  
CREATE TABLE `sam_test1` (  
  `id_` int(4) NOT NULL AUTO_INCREMENT,  
  `name_` varchar(64) NOT NULL,  
  `user_id` bigint(4) NOT NULL, `addli` int(6) not null, 
  PRIMARY KEY (`id_`)  
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;  

相關推薦

使用mycat/otter基於binlog進行mysql擴充套件資料同步

目的:將資料庫DBA中的a表,實時同步到DBB庫中的b表,b表與a表相比多了一個新的欄位進行區分;部署:部署zk;新增zk/node/canal到manager控制;新增資料來源、資料表管理新增channel/pipeline/對映關係eventProcessor: pack

利用binlog恢復mysql資料庫資料

前提:mysql資料庫開啟了binlog日誌      通過  show variables like 'log_%'; 檢視是否開啟binlog日誌。      操作步驟:     在資料庫中

基於binlog恢復mysql資料庫

找到對應時間的binlog檔案 mysqlbinlog --no-defaults --base64-output=decode-rows -v -v /var/lib/mysql/mysql-bin.000538| sed -n '/### DELETE FROM `cl

應對sharding-jdbc結合mybatis實現分庫分表功能 分表的聯合查詢採用將mysql資料同步到elasticsearch進行篩選

應對sharding-jdbc結合mybatis實現分庫分表功能  分表的聯合查詢採用將mysql的資料同步到elasticsearch進行篩選 安裝操作指南:(1)、(2) 其中windows目錄展示如下: 版本控制:1. 需要jdk:1.8(1.8.0_60)

Windows上進行mysql主從資料庫同步總結

主要步驟:主從資料庫同步步驟 解決問題思路: 1.檢查my.ini檔案是否多次設定server-id,my.ini檔案修改不對將導致無法啟動mysql server; 2.檢視C:\ProgramData\MySQL\MySQL Server 5.7\Data資料夾下.err檔案

Windows64環境下 使用Flume將Mysql增量資料同步到Kafka

一.軟體準備1.jdk1.74.maven 下載地址 二.安裝並啟動Kafka1.安裝kafka此步驟看文章,比較詳細,相信不會有問題2.按順序啟動kafka(windows cmd下)    2.1

一次 MySQL資料同步

先說下業務背景,專案中白名單功能重構。這就涉及到老資料同步問題 表結構 CREATE TABLE `nw_white_list` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `user_id` int(11) NOT

solr5.5整合IK分詞及mysql定時資料同步的開發記錄

目錄 1.前言 2.java環境 2.1 安裝jdk 2.1.1 64位安裝 2.1.2 32位安裝 2.1.3 環境變數 2.1.4 重新整理許可權 2.1.5 確認安裝 3 安裝tomcat8 3.1 修改埠號 3.2 設定tomcat-user

mysql資料同步出現Slave_IO_Running:No問題的解決方法小結

 MySQL replication中slave機器上有兩個關鍵的程序,死一個都不行,一個是slave_sql_running,一個是Slave_IO_Running,一個負責與主機的io通訊,一個負責自己的slave mysql程序。 下面寫一下,這兩個要是有no了,怎麼

mysql定時資料同步的實現

        隨著各行業資訊化水平的不斷提升,各種各樣的資訊管理系統都被廣泛使用,各系統間資料完全獨立,形成了大量的資訊孤島。出於管理及決策方面的需求,實現各平臺的資料同步是一個很迫切的需求,TreeSoft資料庫管理系統整合了各主流資料庫的資料訪問及維護監控功能,實現了各

MySQL之間資料同步

MySQL主主同步和主從同步的原理一樣,只是雙方都是主從角色。環境作業系統版本:CentOS7 64位 MySQL版本:mysql5.6.33 節點1IP:192.168.1.205 主機名:edu-mysql-01 節點2IP:192.168.1.206 主機名:edu-m

基於canal的client-adapter資料同步必讀指南

本文將介紹canal專案中client-adapter的使用,以及落地生產中需要考慮的可靠性、高可用與監控報警。(基於canal 1.1.4版本)   canal作為mysql的實時資料訂閱元件,實現了對mysql binlog資料的抓取。 雖然阿里也開源了一個純粹從mysql同步資料到mysql的

基於MYSQLBinlog增量資料同步服務

(by 劉延允) 系統工作原理 基於MYSQL日誌增量資料同步原理: 1、DBAsync偽裝自己為mysql slave,向mysql master傳送dump協議 2、mysql mast

基於 MySQL Binlog 的 Elasticsearch 資料同步實踐 原

一、背景 隨著馬蜂窩的逐漸發展,我們的業務資料越來越多,單純使用 MySQL 已經不能滿足我們的資料查詢需求,例如對於商品、訂單等資料的多維度檢索。 使用 Elasticsearch 儲存業務資料可以很好的解決我們業務中的搜尋需求。而資料進行異構儲存後,隨之而來的就是資料同步的問題。 二、現有方法及問題

MySQL系列:基於binlog的增量訂閱與消費(一)

clas 需要 val tro ali cat tor rip 變化   在一些業務場景中,像在數據分析中我們有時候需要捕獲數據變化(CDC);在數據審計中,我們也往往需要知道數據從這個點到另一個點的變化;同樣在實時分析中,我們有時候需要看到某個值得實時變化等。 要解決以上

mysql 誤刪除 使用binlog 進行回滾

進制 密碼安全 mit 顯示 host 讀取 end 誤刪 nullable mysql> select * from tet3;+----+-------------+| id | dd |+----+-------------+| 1 |

檢視MySQL日誌資料binlog檔案

binlog介紹 binlog,即二進位制日誌,它記錄了資料庫上的所有改變. 改變資料庫的SQL語句執行結束時,將在binlog的末尾寫入一條記錄,同時通知語句解析器,語句執行完畢. binlog格式 基於語句,無法保證所有語句都在從庫執行成功,比如update ... lim

Java api 呼叫Sqoop2進行MySQL-->Hive的資料同步

1.相關jar包 2.一些需要的引數定義在message.properties中 jdbcHiveUrl=jdbc:hive2://10.1.9.91:10000 //hive地址 jdbcHiveDriver=org.apache.hive.jdbc.Hi

mysql database 單向資料同步otter

[[email protected] ~]# wget https://raw.github.com/alibaba/otter/master/manager/deployer/src/main/resources/sql/otter-manager-schema.

基於otter資料同步實驗搭建

目錄 一、實驗目的 二、實驗內容 三、資源劃分 四、元件的安裝配置 4.1 mysql資料庫的安裝 4.1.1前置條件 4.1.2下載地址 4.1.3 安裝 4.1.4 通過命令列連線mysql,修改許可權 4 mysql配置canal許可權 5