Redis快取與資料庫一致性方案
使用Redis快取的模式的有很多種,下面就逐一介紹。
一、資料庫和redis分別處理不同的資料型別
資料庫處理要求強一致實時性的資料,例如金融資料、交易資料;
redis處理不要求強一致實時性的資料,例如網站最熱貼排行榜;
二、Cache-Aside模式
Cache-Aside模式的意思是業務程式碼直接維護快取,這是最常用的一類模式。
2.1 讀場景
先從快取獲取資料,如果快取沒有命中,則回源
到資料庫獲取源資料。
將資料放入到快取,下次即可從快取中獲取資料。
放入快取的可以是非同步的(建立一個新的執行緒),也可以是同步的,根據實際情況自己選擇。
2.2 非高併發情況下的寫場景
先將資料寫入資料庫,寫入成功後立即同步將資料寫入快取。
2.3 寫多讀少的寫場景
先將資料寫入資料庫,寫入成功後,將快取資料過期/刪除,下次讀取時再載入快取。
這樣的好處是避免了不必要的寫快取操作。
2.4 高併發情況下的寫場景
先寫快取,再定期更新資料庫:
非同步化,先寫入redis的快取,就直接返回;定期或特定動作將資料儲存到mysql,可以做到多次更新,一次儲存。
三、Cache-As-SoR(Redis不支援)
SoR:system of record
,記錄系統,或者叫資料來源;資料庫是資料來源的一種。
Cache-As-SoR即把Cache看作SoR,所有操作都是對Cache進行,然後Cache再委託給SoR進行真實的讀寫。即業務程式碼中只看到Cache的操作,看不到關於SoR相關的程式碼。
3.1 Read-Through
業務程式碼讀Cache,如果不命中,由Cache讀SoR。使用Read-Through
模式需要一個CacheLoader
元件來回源到SoR載入資料。Guava Cache
和Ehcache 3.X
都支援該模式。
3.2 Write-Through
對應Read-Through
,需要有一個CacheWriter
元件來回寫SoR。Guava Cache
不支援,Ehcache 3.X
支援該模式。
3.3 Write-Behind
與 Write-Through
的區別是,Write-Through
是同步寫SoR,Write-Behind
四、訂閱/曾量更新模式
canal是阿里巴巴旗下的一款開源專案,純Java開發。基於資料庫增量日誌解析,提供增量資料訂閱&消費,目前主要支援了MySQL(也支援mariaDB)。
基於日誌增量訂閱&消費支援的業務:
- 資料庫映象
- 資料庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- search build
- 業務cache重新整理
- 價格變化等重要業務訊息
4.1 MySQL主備複製原理
從上層來看,複製分成三步:
master
將改變記錄到二進位制日誌binlog
中(binary log events可以通過show binlog events進行檢視);slave
將master的binlog events
拷貝到它的中繼日誌relay log
;- slave**重做**中繼日誌中的事件,將改變反映它自己的資料。
4.2 canal的工作原理
原理相對比較簡單:
canal
模擬mysql slave
的互動協議,偽裝自己為mysql slave,向mysql master
傳送dump請求
;- mysql master收到dump請求,開始推送binary log給slave(也就是canal);
- canal解析binary log物件;
4.3 通過Canal訂閱MySQL的Binlog並更新Redis
package com.datamip.canal;
import java.awt.Event;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
public class App {
public static void main(String[] args) throws InterruptedException {
// 第一步:與canal進行連線
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.23.170", 11111),
"example", "", "");
connector.connect();
// 第二步:開啟訂閱
connector.subscribe();
// 第三步:迴圈訂閱
while (true) {
try {
// 每次讀取 1000 條
Message message = connector.getWithoutAck(1000);
long batchID = message.getId();
int size = message.getEntries().size();
if (batchID == -1 || size == 0) {
System.out.println("當前暫時沒有資料");
Thread.sleep(1000); // 沒有資料
} else {
System.out.println("-------------------------- 有資料啦 -----------------------");
PrintEntry(message.getEntries());
}
// position id ack (方便處理下一條)
connector.ack(batchID);
} catch (Exception e) {
// TODO: handle exception
} finally {
Thread.sleep(1000);
}
}
}
// 獲取每條列印的記錄
@SuppressWarnings("static-access")
public static void PrintEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 實體
Header header = entry.getHeader();
EntryType entryType = entry.getEntryType();
// 第二步: 如果當前是RowData,那就是我需要的資料
if (entryType == EntryType.ROWDATA) {
String tableName = header.getTableName();
String schemaName = header.getSchemaName();
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType = rowChange.getEventType();
System.out.println(String.format("當前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));
// 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那麼sql直接打出來
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
System.out.println("rowchange sql ----->" + rowChange.getSql());
return;
}
// 第三步:追蹤到 columns 級別
rowChange.getRowDatasList().forEach((rowData) -> {
// 獲取更新之前的column情況
List<Column> beforeColumns = rowData.getBeforeColumnsList();
// 獲取更新之後的 column 情況
List<Column> afterColumns = rowData.getAfterColumnsList();
// 當前執行的是 刪除操作
if (eventType == EventType.DELETE) {
PrintColumn(beforeColumns);
}
// 當前執行的是 插入操作
if (eventType == eventType.INSERT) {
PrintColumn(afterColumns);
}
// 當前執行的是 更新操作
if (eventType == eventType.UPDATE) {
PrintColumn(afterColumns);
}
});
}
}
}
// 每個row上面的每一個column 的更改情況
public static void PrintColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
String columnType = column.getMysqlType();
boolean isUpdated = column.getUpdated(); // 判斷 該欄位是否更新
System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,
columnValue, columnType, isUpdated));
});
}
}
需要注意的是,快取的更新會存在延遲,所以快取可根據不一致容忍度設定合理的過期時間
。