基於canal的實時資料同步
阿新 • • 發佈:2018-12-23
適用場景
使用canal做資料備份而不用mysql自帶的主從備份的場景主要為:
- 跨資料庫的資料備份,例如mysql => oracle
- 資料異構,即對同一份資料做不同的分庫分表查詢。例如賣家和買家各自分庫索引
maven
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.2</version>
</dependency>
java
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal. client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang.StringUtils;
public class SimpleCanalClient {
public static void main(String[] args) throws Exception {
String destination = "example";
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), destination, "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int batchSize = 5 * 1024;
while (true) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
synchronizedData(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾資料
}
}
/**
* 同步資料
* @param entries
* @throws Exception
*/
private static void synchronizedData(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
String sql = getSql(rowChange.getEventType(),tableName,rowData);
System.out.println(sql);
// TODO 執行sql語句
}
}
}
/**
* 獲取增刪改的sql
* @param eventType
* @param tableName
* @param rowData
* @return
*/
private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){
String sql = null;
switch (eventType) {
case INSERT:
sql = getInsertSql(tableName,rowData.getAfterColumnsList());
break;
case UPDATE:
sql = getUpdateSql(tableName,rowData.getAfterColumnsList());
break;
case DELETE:
sql = getDeleteSql(tableName,rowData.getBeforeColumnsList());
break;
default:
break;
}
return sql;
}
private static String getInsertSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String keys = "";
String values = "";
for(int i=0;i<columns.size();i++){
if(i != 0) {
keys += ",";
values += ",";
}
keys += columns.get(i).getName();
values += getValue(columns.get(i));
}
String format = "INSERT INTO %s (%s) VALUES (%s)";
return String.format(format,tableName,keys,values);
}
private static String getUpdateSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String sets = "";
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
if(!StringUtils.isBlank(sets)) {
sets += ",";
}
sets += column.getName() + "=" + getValue(column);
}
String format = "UPDATE %s SET %s WHERE %s";
return String.format(format,tableName,sets,where);
}
private static String getDeleteSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
}
String format = "DELETE FROM %s WHERE %s";
return String.format(format,tableName,where);
}
private static String getValue(Column column){
if(column.getIsNull()){
return "null";
}
return String.format("'%s'",column.getValue());
}
}
資料一致性
單機單點消費mysql的log-bin後直接更新到備份資料庫中,資料一致性沒有問題。但是如果變成分散式環境以及消費mysql的log-bin後將更新資料推到MQ中由多節點消費更新到多個備份資料庫中,則會出現資料更新時序和資料一致性的問題。
而以上程式碼在update sql中除了獲取值變化了的欄位,也反查資料庫獲取了未變化的欄位。因此每次update的sql實際上是該條記錄的全量資料。
通過在表中加上時間戳欄位作為記錄的版本號,用邏輯刪除取代物理刪除delete,修改以上程式碼的sql拼接,insert操作時忽略主鍵衝突、update操作時僅更新版本號(時間戳)舊的記錄,可以極大避免資料不一致的現象,也解決了MQ重複消費的問題。
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
再通過定時任務,每天一次增量資料更新,每週一次全量資料更新,保證資料的最終一致性。