實時抽取mysql的資料工具----canal(一)
1、準備:
github:https://github.com/alibaba/canal
裡面有包括canal的文件,server端 client端的 例子 原始碼包等等。
2、canal概述:
canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支援mysql5.x和oracle部分版本的日誌解析
基於日誌增量訂閱&消費支援的業務:
資料庫映象
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務訊息
keyword:資料庫同步,增量訂閱&消費。
3、canal工作原理:
從上層來看,複製分成三步:
master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重做中繼日誌中的事件,將改變反映它自己的資料。
4、部署canal:
部署canal-server:
(1)開啟mysql的binlog功能,並配置binlog模式為row。
在my.cnf 加入如下:
[mysqld]
log-bin=mysql-bin #新增這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複
(2)在mysql中 配置canal資料庫管理使用者,配置相應許可權(repication許可權)
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
(3)下載canal https://github.com/alibaba/canal/releases
解壓到相應資料夾
tar -zxvf canal canal
canal 檔案目錄結構
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
修改配置 instance.properties
vim canal/conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info,需要改成自己的資料庫資訊
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password,需要改成自己的資料庫資訊
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
#################################################
然後cd到bin目錄 啟動和停止canal-server
啟動
./startup.sh
停止
./stop.sh
驗證啟動狀態,檢視log檔案
vim canal/log/canal/canal.log
2014-07-18 10:21:08.525 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2014-07-18 10:21:08.609 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]
2014-07-18 10:21:09.037 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
上述日誌資訊顯示啟動canal成功
執行canal-client例項:
(1)建立例項maven工程
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
(2)新增pom依賴:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
(3)更新依賴 mvn install
(4)canal-client.java 例項程式碼
/**
* Created by hp on 14-7-17.
*/
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.jetbrains.annotations.NotNull;
public class ClientSample {
public static void main(String args[]) {
// 建立連結
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾資料
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(@NotNull List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(@NotNull List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
(5)執行java例項
啟動後看到控制端資訊:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
(6)觸發資料庫變更
create table test (
uid int (4) primary key not null auto_increment,
name varchar(10) not null);
insert into test (name) values('10');
(7)client 抓取mysql資訊:
================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT
uid : 7 update=false
name : 10 update=false
empty count : 1
empty count : 2
5、部署過程中產生問題:
(1)啟動失敗,log日誌中地址正在使用
1、11111埠正在被佔用 可以用 ls -i:11111 檢視監聽程序誰佔用埠 或者 用 ps -ef | grep 11111 檢視哪個程序佔用埠號 然後 kill -9 程序號 殺掉佔用程序
2、可以編輯 canal/conf/canal.properties 中的埠號 ,改為不佔用的埠
(2)canal無法抓取mysql觸發資料庫改變的資訊
1、檢查mysql是否開啟binlog寫入功能 檢查binlog 是否為行模式。
show variables like "binlog_format"
2、檢查my.cnf 和 instance.properties 等配置檔案填寫資訊是否正確。
3、檢查client 程式碼 除錯例項程式碼
4、版本相容問題,canal 1.8 換成 canal 1.7 繼續測試
5、檢視所有日誌檔案 分析日誌