canal原始碼分析簡介-3
5.0 store模組
2018-10-08 23:14:588,32871 store模組簡介
store模組用於binlog事件的儲存 ,目前開源的版本中僅實現了Memory記憶體模式。官方文件中提到"後續計劃增加本地file儲存,mixed混合模式”,這句話大家不必當真,從筆者最開始接觸canal到現在已經幾年了,依然沒有動靜,好在Memory記憶體模式已經可以滿足絕大部分場景。
store模組目錄結構如下,該模組的核心介面為CanalEventStore
:
以下是相關類圖:
其中MemoryEventStoreWithBuffer
就是記憶體模式的實現,是我們分析的重點,其實現了CanalEventStore
AbstractCanalStoreScavenge
抽象類。需要注意的是,AbstractCanalStoreScavenge這個類中定義的欄位和方法在開源版本中並沒有任何地方使用到,因此我們不會對其進行分析。
MemoryEventStoreWithBuffer的實現借鑑了Disruptor的RingBuffer。簡而言之,你可以把其當做一個環形佇列,如下:
針對這個環形佇列,canal定義了3類操作:Put、Get、Ack,其中:
-
Put 操作:新增資料。event parser模組拉取到binlog後,並經過event sink模組過濾,最終就通過Put操作儲存到了佇列中。
-
Get操作:獲取資料。canal client連線到canal server後,最終獲取到的binlog都是從這個佇列中取得。
-
Ack操作:確認消費成功。canal client獲取到binlog事件消費後,需要進行Ack。你可以認為Ack操作實際上就是將消費成功的事件從佇列中刪除,如果一直不Ack的話,佇列滿了之後,Put操作就無法新增新的資料了。
對應的,我們需要使用3個變數來記錄Put、Get、Ack這三個操作的位置,其中:
-
putSequence: 每放入一個數據putSequence +1,可表示儲存資料儲存的總數量
-
getSequence: 每獲取一個數據getSequence +1,可表示資料訂閱獲取的最後一次提取位置
-
ackSequence:每確認一個數據ackSequence + 1,可表示資料最後一次消費成功位置
另外,putSequence、getSequence、ackSequence這3個變數初始值都是-1,且都是遞增的,均用long型表示。由於資料只有被Put進來後,才能進行Get;Get之後才能進行Ack。 所以,這三個變數滿足以下關係:
- ackSequence<=getSequence<=putSequence
如果將RingBuffer拉直來看,將會變得更加直觀:
通過對這3個位置進行運算,我們可以得到一些有用的資訊,如:
計算當前可消費的event數量:
- 當前可消費的event數量=putSequence-getSequence
計算當前佇列的大小(即佇列中還有多少事件等待消費):
- 當前佇列的大小=putSequence-ackSequence
在進行Put/Get/Ack操作時,首先都要確定操作到環形佇列的哪個位置。環形佇列的bufferSize預設大小是16384,而這3個操作的位置變數putSequence、getSequence、ackSequence都是遞增的,顯然最終都會超過bufferSize。因此必須要對這3個值進行轉換。最簡單的操作就是使用%進行取餘。
舉例來說,putSequence的當前值為16383,這已經是環形佇列的最大下標了(從0開始計算),下一個要插入的資料要在第16384個位置上,此時可以使用16384 % bufferSize = 0,因此下一個要插入的資料在0號位置上。可見,當達到佇列的最大下標時,再從頭開始迴圈,這也是為什麼稱之為環形佇列的原因。當然在實際操作時,更加複雜,如0號位置上已經有資料了,就不能插入,需要等待這個位置被釋放出來,否則出現數據覆蓋。
canal使用的是通過位操作進行取餘,這種取餘方式與%作用完全相同,只不過因為是位操作,因此更加高效。其計算方式如下:
- 操作位置=sequence&(bufferSize-1)
需要注意的是,這種方式只對除數是2的N次方冪時才有效,如果對於位運算取餘不熟悉,可參考:https://blog.csdn.net/actionzh/article/details/78976082。
在canal.properties檔案中定義了幾個MemoryEventStoreWithBuffer的配置引數,主要用於控制環形佇列的大小和儲存的資料可佔用的最大記憶體,如下:
- canal.instance.memory.buffer.size=16384
- canal.instance.memory.buffer.memunit=1024
- canal.instance.memory.batch.mode=MEMSIZE
其中:
canal.instance.memory.buffer.size:
表示RingBuffer佇列的最大容量,也就是可快取的binlog事件的最大記錄數,其值需要為2的指數(原因如前所述,canal通過位運算進行取餘),預設值為2^16=16384。
canal.instance.memory.buffer.memunit:
表示RingBuffer使用的記憶體單元, 預設是1kb。和canal.instance.memory.buffer.size組合決定最終的記憶體使用大小。需要注意的是,這個配置項僅僅是用於計算佔用總記憶體,並不是限制每個event最大為1kb。
canal.instance.memory.batch.mode:
表示canal記憶體store中資料快取模式,支援兩種方式:
-
ITEMSIZE :根據buffer.size進行限制,只限制記錄的數量。這種方式有一些潛在的問題,舉個極端例子,假設每個event有1M,那麼16384個這種event佔用記憶體要達到16G左右,基本上肯定會造成記憶體溢位(超大記憶體的物理機除外)。
-
MEMSIZE :根據buffer.size* buffer.memunit的大小,限制快取記錄佔用的總記憶體大小。指定為這種模式時,意味著預設快取的event佔用的總記憶體不能超過16384*1024=16M。這個值偏小,但筆者認為也足夠了。因為通常我們在一個伺服器上會部署多個instance,每個instance的store模組都會佔用16M,因此只要instance的數量合適,也就不會浪費記憶體了。部分讀者可能會擔心,這是否限制了一個event的最大大小為16M,實際上是沒有這個限制的。因為canal在Put一個新的event時,只會判斷佇列中已有的event佔用的記憶體是否超過16M,如果沒有,新的event不論大小是多少,總是可以放入的(canal的記憶體計算實際上是不精確的),之後的event再要放入時,如果這個超過16M的event沒有被消費,則需要進行等待。
在canal自帶的instance.xml檔案中,使用了這些配置項來建立MemoryEventStoreWithBuffer例項,如下:
- <beanid="eventStore"class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
- <propertyname="bufferSize"value="${canal.instance.memory.buffer.size:16384}"/>
- <propertyname="bufferMemUnit"value="${canal.instance.memory.buffer.memunit:1024}"/>
- <propertyname="batchMode"value="${canal.instance.memory.batch.mode:MEMSIZE}"/>
- <propertyname="ddlIsolation"value="${canal.instance.get.ddl.isolation:false}"/>
- </bean>
這裡我們還看到了一個ddlIsolation
屬性,其對於Get操作生效,用於設定ddl語句是否單獨一個batch返回(比如下游dml/ddl如果做batch內無序併發處理,會導致結構不一致)。其值通過canal.instance.get.ddl.isolation配置項來設定,預設值為false。
2CanalEventStore介面
通過前面的分析,我們知道了環形佇列要支援三種操作:Put、Get、Ack,針對這三種操作,在CanalEventStore中都有相應的方法定義,如下所示:
com.alibaba.otter.canal.store.CanalEventStore
- /**
- *canel資料儲存介面
- */
- publicinterfaceCanalEventStore<T>extendsCanalLifeCycle,CanalStoreScavenge{
- //==========================Put操作==============================
- /**新增一組資料物件,阻塞等待其操作完成(比如一次性新增一個事務資料)*/
- voidput(List<T>data)throwsInterruptedException,CanalStoreException;
- /**新增一組資料物件,阻塞等待其操作完成或者時間超時(比如一次性新增一個事務資料)*/
- booleanput(List<T>data,longtimeout,TimeUnitunit)throwsInterruptedException,
- CanalStoreException;
- /**新增一組資料物件(比如一次性新增一個事務資料)*/
- booleantryPut(List<T>data)throwsCanalStoreException;
- /**新增一個數據物件,阻塞等待其操作完成*/
- voidput(Tdata)throwsInterruptedException,CanalStoreException;
- /**新增一個數據物件,阻塞等待其操作完成或者時間超時*/
- booleanput(Tdata,longtimeout,TimeUnitunit)throwsInterruptedException,CanalStoreException;
- /**新增一個數據物件*/
- booleantryPut(Tdata)throwsCanalStoreException;
- //==========================GET操作==============================
- /**獲取指定大小的資料,阻塞等待其操作完成*/
- Events<T>get(Positionstart,intbatchSize)throwsInterruptedException,CanalStoreException;
- /**獲取指定大小的資料,阻塞等待其操作完成或者時間超時*/
- Events<T>get(Positionstart,intbatchSize,longtimeout,TimeUnitunit)throws
- InterruptedException,CanalStoreException;
- /**根據指定位置,獲取一個指定大小的資料*/
- Events<T>tryGet(Positionstart,intbatchSize)throwsCanalStoreException;
- //=========================Ack操作==============================
- /**刪除{@linkplainPosition}之前的資料*/
- voidack(Positionposition)throwsCanalStoreException;
- //==========================其他操作==============================
- /**獲取最後一條資料的position*/
- PositiongetLatestPosition()throwsCanalStoreException;
- /**獲取第一條資料的position,如果沒有資料返回為null*/
- PositiongetFirstPosition()throwsCanalStoreException;
- /**出錯時執行回滾操作(未提交ack的所有狀態資訊重新歸位,減少出錯時資料全部重來的成本)*/
- voidrollback()throwsCanalStoreException;
- }
可以看到Put/Get/Ack操作都有多種過載形式,各個方法的作用參考方法註釋即可,後文在分析MemoryEventStoreWithBuffer時,將會進行詳細的介紹。
這裡對 get方法返回的Events
物件,進行一下說明:
com.alibaba.otter.canal.store.model.Events
- publicclassEvents<EVENT>implementsSerializable{
- privatestaticfinallongserialVersionUID=-7337454954300706044L;
- privatePositionRangepositionRange=newPositionRange();
- privateList<EVENT>events=newArrayList<EVENT>();
- //settersgettersandtoString
- }
可以看到,僅僅是通過一個List維護了一組資料,儘管這裡定義的是泛型,但真實放入的資料實際上是Event型別。而PositionRange是protocol模組中的類,描述了這組Event的開始(start)和結束位置(end),顯然,start表示List集合中第一個Event的位置,end表示最後一個Event的位置。
Event
的定義如下所示 :
com.alibaba.otter.canal.store.model.Event
- publicclassEventimplementsSerializable{
- privatestaticfinallongserialVersionUID=1333330351758762739L;
- privateLogIdentitylogIdentity;//記錄資料產生的來源
- privateCanalEntry.Entryentry;
- //constructorsettersgettersandtoString
- }
其中:CanalEntry.Entry和LogIdentity也都是protocol模組中的類:
-
LogIdentity記錄這個Event的來源資訊mysql地址(sourceAddress)和slaveId。
-
CanalEntry.Entry封裝了binlog事件的資料
3MemoryEventStoreWithBuffer
MemoryEventStoreWithBuffer是目前開源版本中的CanalEventStore介面的唯一實現,基於記憶體模式。當然你也可以進行擴充套件,提供一個基於本地檔案儲存方式的CanalEventStore實現。這樣就可以一份資料讓多個業務費進行訂閱,只要獨立維護消費位置元資料即可。然而,我不得不提醒你的是,基於本地檔案的儲存方式,一定要考慮好資料清理工作,否則會有大坑。
如果一個庫只有一個業務方訂閱,其實根本也不用實現本地儲存,使用基於記憶體模式的佇列進行快取即可。如果client消費的快,那麼佇列中的資料放入後就被取走,佇列基本上一直是空的,實現本地儲存也沒意義;如果client消費的慢,佇列基本上一直是滿的,只要client來獲取,總是能拿到資料,因此也沒有必要實現本地儲存。
言歸正傳,下面對MemoryEventStoreWithBuffer的原始碼進行分析。
3.1MemoryEventStoreWithBuffer欄位
首先對MemoryEventStoreWithBuffer中定義的欄位進行一下介紹,這是後面分析其他方法的基礎,如下:
- publicclassMemoryEventStoreWithBufferextendsAbstractCanalStoreScavengeimplements
- CanalEventStore<Event>,CanalStoreScavenge{
- privatestaticfinallongINIT_SQEUENCE=-1;
- privateintbufferSize=16*1024;
- //memsize的單位,預設為1kb大小
- privateintbufferMemUnit=1024;
- privateintindexMask;
- privateEvent[]entries;
- //記錄下put/get/ack操作的三個下標,初始值都是-1
- //代表當前put操作最後一次寫操作發生的位置
- privateAtomicLongputSequence=newAtomicLong(INIT_SQEUENCE);
- //代表當前get操作讀取的最後一條的位置
- privateAtomicLonggetSequence=newAtomicLong(INIT_SQEUENCE);
- //代表當前ack操作的最後一條的位置
- privateAtomicLongackSequence=newAtomicLong(INIT_SQEUENCE);
- //記錄下put/get/ack操作的三個memsize大小
- privateAtomicLongputMemSize=newAtomicLong(0);
- privateAtomicLonggetMemSize=newAtomicLong(0);
- privateAtomicLongackMemSize=newAtomicLong(0);
- //阻塞put/get操作控制訊號
- privateReentrantLocklock=newReentrantLock();
- privateConditionnotFull=lock.newCondition();
- privateConditionnotEmpty=lock.newCondition();
- //預設為記憶體大小模式
- privateBatchModebatchMode=BatchMode.ITEMSIZE;
- privatebooleanddlIsolation=false;
- ...
- }
屬性說明:
bufferSize、bufferMemUnit、batchMode、ddlIsolation、putSequence、getSequence、ackSequence:
這幾個屬性前面已經介紹過,這裡不再贅述。
entries:
型別為Event[]陣列,環形佇列底層基於的Event[]陣列,佇列的大小就是bufferSize。關於如何使用陣列來實現環形佇列,可參考筆者的另一篇文章http://www.tianshouzhi.com/api/tutorials/basicalgorithm/43。
indexMask
用於對putSequence、getSequence、ackSequence進行取餘操作,前面已經介紹過canal通過位操作進行取餘,其值為bufferSize-1 ,參見下文的start方法
putMemSize、getMemSize、ackMemSize:
分別用於記錄put/get/ack操作的event佔用記憶體的累加值,都是從0開始計算。例如每put一個event,putMemSize就要增加這個event佔用的記憶體大小;get和ack操作也是類似。這三個變數,都是在batchMode指定為MEMSIZE的情況下,才會發生作用。
因為都是累加值,所以我們需要進行一些運算,才能得有有用的資訊,如:
計算出當前環形隊列當前佔用的記憶體大小
- 環形隊列當前佔用的記憶體大小=putMemSize-ackMemSize
前面我們提到,batchMode為MEMSIZE時,需要限制環形佇列中event佔用的總記憶體,事實上在執行put操作前,就是通過這種方式計算出來當前大小,然後我們限制的bufferSize*bufferMemUnit大小進行比較。
計算尚未被獲取的事件佔用的記憶體大小
- 尚未被獲取的事件佔用的記憶體大小=putMemSize-getMemSize
batchMode除了對PUT操作有限制,對Get操作也有影響。Get操作可以指定一個batchSize,用於指定批量獲取的大小。當batchMode為MEMSIZE時,其含義就在不再是記錄數,而是要獲取到總共佔用 batchSize *bufferMemUnit 記憶體大小的事件數量。
lock、notFull、notEmpty:
阻塞put/get操作控制訊號。notFull用於控制put操作,只有佇列沒滿的情況下才能put。notEmpty控制get操作,只有佇列不為空的情況下,才能get。put操作和get操作共用一把鎖(lock)。
3.2 啟動和停止方法
MemoryEventStoreWithBuffer實現了CanalLifeCycle
介面,因此實現了其定義的start、stop方法
start啟動方法
start方法主要是初始化MemoryEventStoreWithBuffer內部的環形佇列,其實就是初始化一下Event[]陣列。
- publicvoidstart()throwsCanalStoreException{
- super.start();
- if(Integer.bitCount(bufferSize)!=1){
- thrownewIllegalArgumentException("bufferSizemustbeapowerof2");
- }
- indexMask=bufferSize-1;//初始化indexMask,前面已經介紹過,用於通過位操作進行取餘
- entries=newEvent[bufferSize];//建立迴圈佇列基於的底層陣列,大小為bufferSize
- }
stop停止方法
stop方法作用是停止,在停止時會清空所有快取的資料,將維護的相關狀態變數設定為初始值。
MemoryEventStoreWithBuffer#stop
- publicvoidstop()throwsCanalStoreException{
- super.stop();
- //清空所有快取的資料,將維護的相關狀態變數設定為初始值
- cleanAll();
- }
在停止時,通過呼叫cleanAll
方法清空所有快取的資料。
cleanAll方法是在CanalStoreScavenge介面中定義的,在MemoryEventStoreWithBuffer中進行了實現, 此外這個介面還定義了另外一個方法cleanUtil,在執行ack操作時會被呼叫,我們將在介紹ack方法時進行講解。
MemoryEventStoreWithBuffer#cleanAll
- publicvoidcleanAll()throwsCanalStoreException{
- finalReentrantLocklock=this.lock;
- lock.lock();
- try{
- //將Put/Get/Ack三個操作的位置都重置為初始狀態-1
- putSequence.set(INIT_SQEUENCE);
- getSequence.set(INIT_SQEUENCE);
- ackSequence.set(INIT_SQEUENCE);
- //將Put/Get/Ack三個操作的memSize都重置為0
- putMemSize.set(0);
- getMemSize.set(0);
- ackMemSize.set(0);
- //將底層Event[]陣列置為null,相當於清空所有資料
- entries=null;
- }finally{
- lock.unlock();
- }
- }
4.2 Put操作
前面分析CanalEventStore介面中,我們看到總共有6個put方法,可以分為3類:
-
不帶timeout超時引數的put方法,會一直進行阻塞,直到有足夠的空間可以放入。
-
帶timeout引數超時引數的put方法,如果超過指定時間還未put成功,會丟擲InterruptedException。
-
tryPut方法每次只是嘗試放入資料,立即返回true或者false,不會阻塞。
事實上,這些方法只是超時機制不同,底層都是通過呼叫doPut方法來完成真正的資料放入。因此在後面的分析中,筆者只選擇其中一種進行講解。
所有的put操作,在放入資料之前,都需要進行一些前置檢查工作,主要檢查2點:
1、檢查是否足夠的slot
預設的bufferSize設定大小為16384,即有16384個slot,每個slot可以儲存一個event,因此canal預設最多快取16384個event。從來另一個角度出發,這意味著putSequence最多比ackSequence可以大16384,不能超過這個值。如果超過了,就意味著尚未沒有被消費的資料被覆蓋了,相當於丟失了資料。因此,如果Put操作滿足以下條件時,是不能新加入資料的
- (putSequence+need_put_events_size)-ackSequence>bufferSize
"putSequence +need_put_events_size"的結果為新增資料後的putSequence的最終位置值,要把這個作為預判斷條件,其減去ackSequence,如果大於bufferSize,則不能插入資料。需要等待有足夠的空間,或者丟擲異常。
2、檢測是否超出了記憶體限制
前面我們已經看到了,為了控制佇列中event佔用的總記憶體大小,可以指定batchMode為MEMSIZE。在這種情況下,buffer.size* buffer.memunit
(預設為16M)就表示環形佇列儲存的event總共可以佔用的記憶體大小。因此當出現以下情況下, 不能加入新的event:
- (putMemSize-ackMemSize)>buffer.size*buffer.memunit
關於putMemSize和ackMemSize前面已經介紹過,二者的差值,實際上就是"隊列當前包含的event佔用的總記憶體”。
下面我們選擇可以指定timeout超時時間的put方法進行講解,如下:
- publicbooleanput(List<Event>data,longtimeout,TimeUnitunit)throwsInterruptedException,
- CanalStoreException{
- //1如果需要插入的List為空,直接返回true
- if(data==null||data.isEmpty()){
- returntrue;
- }
- //2獲得超時時間,並通過加鎖進行put操作
- longnanos=unit.toNanos(timeout);
- finalReentrantLocklock=this.lock;
- lock.lockInterruptibly();
- try{
- for(;;){//這是一個死迴圈,執行到下面任意一個return或者丟擲異常是時才會停止
- //3檢查是否滿足插入條件,如果滿足,進入到3.1,否則進入到3.2
- if(checkFreeSlotAt(putSequence.get()+data.size())){
- //3.1如果滿足條件,呼叫doPut方法進行真正的插入
- doPut(data);
- returntrue;
- }
- //3.2判斷是否已經超時,如果超時,則不執行插入操作,直接返回false
- if(nanos<=0){
- returnfalse;
- }
- //3.3如果還沒有超時,呼叫notFull.awaitNanos進行等待,需要其他執行緒呼叫notFull.signal()方法喚醒。
- //喚醒是在ack操作中進行的,ack操作會刪除已經消費成功的event,此時佇列有了空間,因此可以喚醒,詳見ack方法分析
- //當被喚醒後,因為這是一個死迴圈,所以迴圈中的程式碼會重複執行。當插入條件滿足時,呼叫doPut方法插入,然後返回
- try{
- nanos=notFull.awaitNanos(nanos);
- //3.4如果一直等待到超時,都沒有可用空間可以插入,notFull.awaitNanos會丟擲InterruptedException
- }catch(InterruptedExceptionie){
- notFull.signal();//3.5超時之後,喚醒一個其他執行put操作且未被中斷的執行緒(不明白是為了幹啥)
- throwie;
- }
- }
- }finally{
- lock.unlock();
- }
- }
上述方法的第3步,通過呼叫checkFreeSlotAt
方法來執行插入資料前的檢查工作,所做的事情就是我們前面提到的2點:1、檢查是否足夠的slot 2、檢測是否超出了記憶體限制,原始碼如下所示:
MemoryEventStoreWithBuffer#checkFreeSlotAt
- /**查詢是否有空位*/
- privatebooleancheckFreeSlotAt(finallongsequence){
- //1、檢查是否足夠的slot。注意方法引數傳入的sequence值是:當前putSequence值+新插入的event的記錄數。
- //按照前面的說明,其減去bufferSize不能大於ack位置,或者換一種說法,減去bufferSize不能大於ack位置。
- //1.1首先用sequence值減去bufferSize
- finallongwrapPoint=sequence-bufferSize;
- //1.2獲取get位置ack位置的較小值,事實上,ack位置總是應該小於等於get位置,因此這裡總是應該返回的是ack位置。
- finallongminPoint=getMinimumGetOrAck();
- //1.3將1.1與1.2步得到的值進行比較,如果前者大,說明二者差值已經超過了bufferSize,不能插入資料,返回false
- if(wrapPoint>minPoint){//剛好追上一輪
- returnfalse;
- }else{
- //2如果batchMode是MEMSIZE,繼續檢查是否超出了記憶體限制。
- if(batchMode.isMemSize()){
- //2.1使用putMemSize值減去ackMemSize值,得到當前儲存的event事件佔用的總記憶體
- finallongmemsize=putMemSize.get()-ackMemSize.get();
- //2.2如果沒有超出bufferSize*bufferMemUnit記憶體限制,返回true,否則返回false
- if(memsize<bufferSize*bufferMemUnit){
- returntrue;
- }else{
- returnfalse;
- }
- }else{
- //3如果batchMode不是MEMSIZE,說明只限制記錄數,則直接返回true
- returntrue;
- }
- }
- }
getMinimumGetOrAck
方法用於返回getSequence和ackSequence二者的較小值,原始碼如下所示:
MemoryEventStoreWithBuffer#getMinimumGetOrAck
- privatelonggetMinimumGetOrAck(){
- longget=getSequence.get();
- longack=ackSequence.get();
- returnack<=get?ack:get;
- }
如前所述,ackSequence總是應該小於等於getSequence,因此這裡判斷應該是沒有必要的,筆者已經給官方提了issue,也得到了確認,參見:https://github.com/alibaba/canal/issues/966。
當checkFreeSlotAt方法檢驗通過後,最終呼叫的是doPut
方法進行插入。doPut方法主要有4個步驟:
1、將新插入的event資料賦值到Event[]陣列的正確位置上,就算完成了插入
2、當新插入的event記錄數累加到putSequence上
3、累加新插入的event的大小到putMemSize上
4、呼叫notEmpty.signal()方法,通知佇列中有資料了,如果之前有client獲取資料處於阻塞狀態,將會被喚醒
MemoryEventStoreWithBuffer#doPut
- /***執行具體的put操作*/
- privatevoiddoPut(List<Event>data){
- //1將新插入的event資料賦值到Event[]陣列的正確位置上
- //1.1獲得putSequence的當前值current,和插入資料後的putSequence結束值end
- longcurrent=putSequence.get();
- longend=current+data.size();
- //1.2迴圈需要插入的資料,從current位置開始,到end位置結束
- for(longnext=current+1;next<=end;next++){
- //1.3通過getIndex方法對next變數轉換成正確的位置,設定到Event[]陣列中
- //需要轉換的原因在於,這裡的Event[]陣列是環形佇列的底層實現,其大小為bufferSize值,預設為16384。
- //執行一段時間後,接收到的binlog數量肯定會超過16384,每接受到一個event,putSequence+1,因此最終必然超過這個值。
- //而next變數是比當前putSequence值要大的,因此必須進行轉換,否則會陣列越界,轉換工作就是在getIndex方法中進行的。
- entries[getIndex(next)]=data.get((int)(next-current-1));
- }
- //2直接設定putSequence為end值,相當於完成event記錄數的累加
- putSequence.set(end);
- //3累加新插入的event的大小到putMemSize上
- if(batchMode.isMemSize()){
- //用於記錄本次插入的event記錄的大小
- longsize=0;
- //迴圈每一個event
- for(Eventevent:data){
- //通過calculateSize方法計算每個event的大小,並累加到size變數上
- size+=calculateSize(event);
- }
- //將size變數的值,新增到當前putMemSize
- putMemSize.getAndAdd(size);
- }
- //4呼叫notEmpty.signal()方法,通知佇列中有資料了,如果之前有client獲取資料處於阻塞狀態,將會被喚醒
- notEmpty.signal();
- }
上述程式碼中,通過getIndex
方法方法來進行位置轉換,其內部通過位運算來快速取餘數,不再贅述
MemoryEventStoreWithBuffer#getIndex
- privateintgetIndex(longsequcnce){
- return(int)sequcnce&indexMask;//bufferSize-1
- }
對於batchMode是MEMSIZE的情況下, 還會通過calculateSize
方法計算每個event佔用的記憶體大小,累加到putMemSize上。
MemoryEventStoreWithBuffer#calculateSize
- privatelongcalculateSize(Eventevent){
- //直接返回binlog中的事件大小
- returnevent.getEntry().getHeader().getEventLength();
- }
其原理在於,mysql的binlog的event header中,都有一個event_length表示這個event佔用的位元組數。不熟悉mysql binlog event結構的讀者可參考:https://dev.mysql.com/doc/internals/en/event-structure.html
parser模組將二進位制形式binlog event解析後,這個event_length欄位的值也被解析出來了,轉換成Event物件後,在儲存到store模組時,就可以根據其值判斷佔用記憶體大小。
需要注意的是,這個計算並不精確。原始的event_length表示的是event是二進位制位元組流時的位元組數,在轉換成java物件後,基本上都會變大。如何獲取java物件的真實大小,可參考這個部落格:https://www.cnblogs.com/Kidezyq/p/8030098.html。
4.3 Get操作
Put操作是canal parser模組解析binlog事件,並經過sink模組過濾後,放入到store模組中,也就是說Put操作實際上是canal內部呼叫。 Get操作(以及ack、rollback)則不同,其是由client發起的網路請求,server端通過對請求引數進行解析,最終呼叫CanalEventStore模組中定義的對應方法。
Get操作用於獲取指定batchSize大小的Events。提供了3個方法:
- //嘗試獲取,如果獲取不到立即返回
- publicEvents<Event>tryGet(Positionstart,intbatchSize)throwsCanalStoreException
- //獲取指定大小的資料,阻塞等待其操作完成
- publicEvents<Event>get(Positionstart,intbatchSize)throwsInterruptedException,CanalStoreException
- //獲取指定大小的資料,阻塞等待其操作完成或者超時,如果超時了,有多少,返回多少
- publicEvents<Event>get(Positionstart,intbatchSize,longtimeout,TimeUnitunit)
- throwsInterruptedException,CanalStoreException
其中:
-
start引數:其型別為Posisiton,表示從哪個位置開始獲取
-
batchSize引數:表示批量獲取的大小
-
timeout和uint引數:超時引數配置
與Put操作類似,MemoryEventStoreWithBuffer在實現這三個方法時,真正的獲取操作都是在doGet方法中進行的。這裡我們依然只選擇其中一種進行完整的講解:
- publicEvents<Event>get(Positionstart,intbatchSize,longtimeout,TimeUnitunit)
- throwsInterruptedException,CanalStoreException{
- longnanos=unit.toNanos(timeout);
- finalReentrantLocklock=this.lock;
- lock.lockInterruptibly();
- try{
- for(;;){
- if(checkUnGetSlotAt((LogPosition)start,batchSize)){
- returndoGet(start,batchSize);
- }
- if(nanos<=0){
- //如果時間到了,有多少取多少
- returndoGet(start,batchSize);
- }
- try{
- nanos=notEmpty.awaitNanos(nanos);
- }catch(InterruptedExceptionie){
- notEmpty.signal();//propagatetonon-interruptedthread
- throwie;
- }
- }
- }finally{
- lock.unlock();
- }
- }
可以看到,get方法的實現邏輯與put方法整體上是類似的,不再贅述。這裡我們直接關注checkUnGetSlotAt
和doGet
方法。
checkUnGetSlotAt方法,用於檢查是否有足夠的event可供獲取,根據batchMode的不同,有著不同的判斷邏輯
-
如果batchMode為ITEMSIZE,則表示只要有有滿足batchSize數量的記錄數即可,即putSequence - getSequence >= batchSize
-
如果batchMode為MEMSIZE,此時batchSize不再表示記錄數,而是bufferMemUnit的個數,也就是說,獲取到的event列表佔用的總記憶體要達到batchSize * bufferMemUnit,即putMemSize-getMemSize >= batchSize * bufferMemUnit
MemoryEventStoreWithBuffer#checkUnGetSlotAt
- privatebooleancheckUnGetSlotAt(LogPositionstartPosition,intbatchSize){
- //1如果batchMode為ITEMSIZE
- if(batchMode.isItemSize()){
- longcurrent=getSequence.get();
- longmaxAbleSequence=putSequence.get();
- longnext=current;
- //1.1第一次訂閱之後,需要包含一下start位置,防止丟失第一條記錄。
- if(startPosition==null||!startPosition.getPostion().isIncluded()){
- next=next+1;
- }
- //1.2理論上只需要滿足條件:putSequence-getSequence>=batchSize
- //1.2.1先通過current<maxAbleSequence進行一下簡單判斷,如果不滿足,可以直接返回false了
- //1.2.2如果1.2.1滿足,再通過putSequence-getSequence>=batchSize判斷是否有足夠的資料
- if(current<maxAbleSequence&&next+batchSize-1<=maxAbleSequence){
- returntrue;
- }else{
- returnfalse;
- }
- //2如果batchMode為MEMSIZE
- }else{
- longcurrentSize=getMemSize.get();
- longmaxAbleSize=putMemSize.get();
- //2.1需要滿足條件putMemSize-getMemSize>=batchSize*bufferMemUnit
- if(maxAbleSize-currentSize>=batchSize*bufferMemUnit){
- returntrue;
- }else{
- returnfalse;
- }
- }
- }
關於1.1步的描述"第一次訂閱之後,需要包含一下start位置,防止丟失第一條記錄”,這裡進行一下特殊說明。首先要明確checkUnGetSlotAt方法的startPosition引數到底是從哪裡傳遞過來的。
當一個client在獲取資料時,CanalServerWithEmbedded
的getWithoutAck/或get方法會被呼叫。其內部首先通過CanalMetaManager
查詢client的消費位置資訊,由於是第一次,肯定沒有記錄,因此返回null,此時會呼叫CanalEventStore的getFirstPosition()方法,嘗試把第一條資料作為消費的開始。而此時CanalEventStore中可能有資料,也可能沒有資料。在沒有資料的情況下,依然返回null;在有資料的情況下,把第一個Event的位置作為消費開始位置。那麼顯然,傳入checkUnGetSlotAt方法的startPosition引數可能是null,也可能不是null。所以有了以下處理邏輯:
- if(startPosition==null||!startPosition.getPostion().isIncluded()){
- next=next+1;
- }
如果不是null的情況下,儘管把第一個event當做開始位置,但是因為這個event畢竟還沒有消費,所以在消費的時候我們必須也將其包含進去。之所以要+1,因為是第一次獲取,getSequence的值肯定還是初始值-1,所以要+1變成0之後才是佇列的第一個event位置。關於CanalEventStore的getFirstPosition()方法,我們將在最後分析。
當通過checkUnGetSlotAt的檢查條件後,通過doGet方法進行真正的資料獲取操作,獲取主要分為5個步驟:
1、確定從哪個位置開始獲取資料
2、根據batchMode是MEMSIZE還是ITEMSIZE,通過不同的方式來獲取資料
3、設定PositionRange,表示獲取到的event列表開始和結束位置
4、設定ack點
5、累加getSequence,getMemSize值
MemoryEventStoreWithBuffer#doGet
- privateEvents<Event>doGet(Positionstart,intbatchSize)throwsCanalStoreException{
- LogPositionstartPosition=(LogPosition)start;
- //1確定從哪個位置開始獲取資料
- //獲得當前的get位置
- longcurrent=getSequence.get();
- //獲得當前的put位置
- longmaxAbleSequence=putSequence.get();
- //要獲取的第一個Event的位置,一開始等於當前get位置
- longnext=current;
- //要獲取的最後一個event的位置,一開始也是當前get位置,每獲取一個event,end值加1,最大為current+batchSize
- //因為可能進行ddl隔離,因此可能沒有獲取到batchSize個event就返回了,此時end值就會小於current+batchSize
- longend=current;
- //如果startPosition為null,說明是第一次訂閱,預設+1處理,因為getSequence的值是從-1開始的
- //如果tartPosition不為null,需要包含一下start位置,防止丟失第一條記錄
- if(startPosition==null||!startPosition.getPostion().isIncluded()){
- next=next+1;
- }
- //如果沒有資料,直接返回一個空列表
- if(current>=maxAbleSequence){
- returnnewEvents<Event>();
- }
- //2如果有資料,根據batchMode是ITEMSIZE或MEMSIZE選擇不同的處理方式
- Events<Event>result=newEvents<Event>();
- //維護要返回的Event列表
- List<Event>entrys=result.getEvents();
- longmemsize=0;
- //2.1如果batchMode是ITEMSIZE
- if(batchMode.isItemSize()){
- end=(next+batchSize-1)<maxAbleSequence?(next+batchSize-1):maxAbleSequence;
- //2.1.1迴圈從開始位置(next)到結束位置(end),每次迴圈next+1
- for(;next<=end;next++){
- //2.1.2獲取指定位置上的事件
- Eventevent=entries[getIndex(next)];
- //2.1.3果是當前事件是DDL事件,且開啟了ddl隔離,本次事件處理完後,即結束迴圈(if語句最後是一行是break)
- if(ddlIsolation&&isDdl(event.getEntry().getHeader().getEventType())){
- //2.1.4因為ddl事件需要單獨返回,因此需要判斷entrys中是否應添加了其他事件
- if(entrys.size()==0){//如果entrys中尚未新增任何其他event
- entrys.add(event);//加入當前的DDL事件
- end=next;//更新end為當前值
- }else{
- //如果已經添加了其他事件如果之前已經有DML事件,直接返回了,因為不包含當前next這記錄,需要回退一個位置
- end=next-1;//next-1一定大於current,不需要判斷
- }
- break;
- }else{//如果沒有開啟DDL隔離,直接將事件加入到entrys中
- entrys.add(event);
- }
- }
- //2.2如果batchMode是MEMSIZE
- }else{
- //2.2.1計算本次要獲取的event佔用最大位元組數
- longmaxMemSize=batchSize*bufferMemUnit;
- //2.2.2memsize從0開始,當memsize小於maxMemSize且next未超過maxAbleSequence時,可以進行迴圈
- for(;memsize<=maxMemSize&&next<=maxAbleSequence;next++){
- //2.2.3獲取指定位置上的Event
- Eventevent=entries[getIndex(next)];
- //2.2.4果是當前事件是DDL事件,且開啟了ddl隔離,本次事件處理完後,即結束迴圈(if語句最後是一行是break)
- if(ddlIsolation&&isDdl(event.getEntry().getHeader().getEventType())){
- //如果是ddl隔離,直接返回
- if(entrys.size()==0){
- entrys.add(event);//如果沒有DML事件,加入當前的DDL事件
- end=next;//更新end為當前
- }else{
- //如果之前已經有DML事件,直接返回了,因為不包含當前next這記錄,需要回退一個位置
- end=next-1;//next-1一定大於current,不需要判斷
- }
- break;
- }else{
- //如果沒有開啟DDL隔離,直接將事件加入到entrys中
- entrys.add(event);
- //並將當前新增的event佔用位元組數累加到memsize變數上
- memsize+=calculateSize(event);
- end=next;//記錄end位點
- }
- }
- }
- //3構造PositionRange,表示本次獲取的Event的開始和結束位置
- PositionRange<LogPosition>range=newPositionRange<LogPosition>();
- result.setPositionRange(range);
- //3.1把entrys列表中的第一個event的位置,當做PositionRange的開始位置
- range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
- //3.2把entrys列表中的最後一個event的位置,當做PositionRange的結束位置
- range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size()-1)));
- //4記錄一下是否存在可以被ack的點,逆序迭代獲取到的Event列表
- for(inti=entrys.size()-1;i>=0;i--){
- Eventevent=entrys.get(i);
- //4.1.1如果是事務開始/事務結束/或者dll事件,
- if(CanalEntry.EntryType.TRANSACTIONBEGIN==event.getEntry().getEntryType()
- ||CanalEntry.EntryType.TRANSACTIONEND==event.getEntry().getEntryType()
- ||isDdl(event.getEntry().getHeader().getEventType())){
- //4.1.2將其設定為可被ack的點,並跳出迴圈
- range.setAck(CanalEventUtils.createPosition(event));
- break;
- }
- //4.1.3如果沒有這三種類型事件,意味著沒有可被ack的點
- }
- //5累加getMemSize值,getMemSize值
- //5.1通過AtomLong的compareAndSet嘗試增加getSequence值
- if(getSequence.compareAndSet(current,end)){//如果成功,累加getMemSize
- getMemSize.addAndGet(memsize);
- //如果之前有put操作因為佇列滿了而被阻塞,這裡傳送訊號,通知佇列已經有空位置,下面還要進行說明
- notFull.signal();
- returnresult;
- }else{//如果失敗,直接返回空事件列表
- returnnewEvents<Event>();
- }
- }
補充說明:
1 Get資料時,會通過isDdl
方法判斷event是否是ddl型別。
MemoryEventStoreWithBuffer#isDdl
- privatebooleanisDdl(EventTypetype){
- returntype==EventType.ALTER||type==EventType.CREATE||type==EventType.ERASE
- ||type==EventType.RENAME||type==EventType.TRUNCATE||type==EventType.CINDEX
- ||type==EventType.DINDEX;
- }
這裡的EventType是在protocol模組中定義的,並非mysql binlog event結構中的event type。在原始的mysql binlog event型別中,有一個QueryEvent,裡面記錄的是執行的sql語句,canal通過對這個sql語句進行正則表示式匹配,判斷出這個event是否是DDL語句(參見SimpleDdlParser
#parse方法)。
2 獲取到event列表之後,會構造一個PostionRange
物件。
通過CanalEventUtils.createPosition方法計算出第一、最後一個event的位置,作為PostionRange的開始和結束。
事實上,parser模組解析後,已經將位置資訊:binlog檔案,position封裝到了Event中,createPosition方法只是將這些資訊提取出來。
CanalEventUtils#createPosition
- publicstaticLogPositioncreatePosition(Eventevent){
- //=============建立一個EntryPosition例項,提取event中的位置資訊============
- EntryPositionposition=newEntryPosition();
- //event所在的binlog檔案
- position.setJournalName(event.getEntry().getHeader().getLogfileName());
- //event鎖在binlog檔案中的位置
- position.setPosition(event.getEntry().getHeader().getLogfileOffset());
- //event的建立時間
- position.setTimestamp(event.getEntry().getHeader().getExecuteTime());
- //event是mysql主從叢集哪一個例項上生成的,一般都是主庫,如果從庫沒有配置read-only,那麼serverId也可能是從庫
- position.setServerId(event.getEntry().getHeader().getServerId());
- //===========將EntryPosition例項封裝到一個LogPosition物件中===============
- LogPositionlogPosition=newLogPosition();
- logPosition.setPostion(position);
- //LogIdentity中包含了這個event來源的mysql實力的ip地址資訊
- logPosition.setIdentity(event.getLogIdentity());
- returnlogPosition;
- }
3 獲取到Event列表後,會從中逆序尋找第一個型別為"事務開始/事務結束/DDL"的Event,將其位置作為PostionRange的可ack位置。
mysql原生的binlog事件中,總是以一個內容”BEGIN”的QueryEvent作為事務開始,以XidEvent事件表示事務結束。即使我們沒有顯式的開啟事務,對於單獨的一個更新語句(如Insert、update、delete),mysql也會預設開啟事務。而canal將其轉換成更容易理解的自定義EventType型別:TRANSACTIONBEGIN、TRANSACTIONEND。
而將這些事件作為ack點,主要是為了保證事務的完整性。例如client一次拉取了10個binlog event,前5個構成一個事務,後5個還不足以構成一個完整事務。在ack後,如果這個client停止了,也就是說下一個事務還沒有被完整處理完。儘管之前ack的是10條資料,但是client重新啟動後,將從第6個event開始消費,而不是從第11個event開始消費,因為第6個event是下一個事務的開始。
具體邏輯在於,canal server在接受到client ack後,CanalServerWithEmbedded#ack方法會執行。其內部首先根據ack的batchId找到對應的PositionRange,再找出其中的ack點,通過CanalMetaManager將這個位置記錄下來。之後client重啟後,再把這個位置資訊取出來,從這個位置開始消費。
也就是說,ack位置實際上提供給CanalMetaManager使用的。而對於MemoryEventStoreWithBuffer本身而言,也需要進行ack,用於將已經消費的資料從佇列中清除,從而騰出更多的空間存放新的資料。
4.4 ack操作
相對於get操作和put操作,ack操作沒有過載,只有一個ack方法,用於清空指定position之前的資料,如下:
MemoryEventStoreWithBuffer#ack
- publicvoidack(Positionposition)throwsCanalStoreException{
- cleanUntil(position);
- }
CanalStoreScavenge介面定義了2個方法:cleanAll和cleanUntil。前面我們已經看到了在stop時,cleanAll方法會被執行。而每次ack時,cleanUntil
方法會被執行,這個方法實現如下所示:
MemoryEventStoreWithBuffer#cleanUntil
- //postion表示要ack的配置
- publicvoidcleanUntil(Positionposition)throwsCanalStoreException{
- finalReentrantLocklock=this.lock;
- lock.lock();
- try{
- //獲得當前ack值
- longsequence=ackSequence.get();
- //獲得當前get值
- longmaxSequence=getSequence.get();
- booleanhasMatch=false;
- longmemsize=0;
- //迭代所有未被ack的event,從中找出與需要ack的position相同位置的event,清空這個event之前的所有資料。
- //一旦找到這個event,迴圈結束。
- for(longnext=sequence+1;next<=maxSequence;next++){
- Eventevent=entries[getIndex(next)];//獲得要ack的event
- memsize+=calculateSize(event);//計算當前要ack的event佔用位元組數
- booleanmatch=CanalEventUtils.checkPosition(event,(LogPosition)position);
- if(match){//找到對應的position,更新ackseq
- hasMatch=true;
- if(batchMode.isMemSize()){//如果batchMode是MEMSIZE
- ackMemSize.addAndGet(memsize);//累加ackMemSize
- //嘗試清空buffer中的記憶體,將ack之前的記憶體全部釋放掉
- for(longindex=sequence+1;index<next;index++){
- entries[getIndex(index)]=null;//設定為null
- }
- }
- //累加ack值
- //官方註釋說,採用compareAndSet,是為了避免併發ack。我覺得根本不會併發ack,因為都加鎖了
- if(ackSequence.compareAndSet(sequence,next)){
- notFull.signal();//如果之前存在put操作因為佇列滿了而被阻塞,通知其佇列有了新空間
- return;
- }
- }
- }
- if(!hasMatch){//找不到對應需要ack的position
- thrownewCanalStoreException("nomatchackposition"+position.toString());
- }
- }finally{
- lock.unlock();
- }
- }
在匹配尚未ack的Event,是否有匹配的位置時,呼叫了CanalEventUtils#checkPosition
方法。其內部:
-
首先比較Event的生成時間
-
接著,如果位置資訊的binlog檔名或者資訊不為空的話(通常不為空),則會進行精確匹配
CanalEventUtils#checkPosition
- /**
- *判斷當前的entry和position是否相同
- */
- publicstaticbooleancheckPosition(Eventevent,LogPositionlogPosition){
- EntryPositionposition=logPosition.getPostion();
- CanalEntry.Entryentry=event.getEntry();
- //匹配時間
- booleanresult=position.getTimestamp().equals(entry.getHeader().getExecuteTime());
- //判斷是否需要根據:binlog檔案+position進行比較
- booleanexactely=(StringUtils.isBlank(position.getJournalName())&&position.getPosition()==null);
- if(!exactely){//精確匹配
- result&=StringUtils.equals(entry.getHeader().getLogfileName(),position.getJournalName());
- result&=position.getPosition().equals(entry.getHeader().getLogfileOffset());
- }
- returnresult;
- }
4.5 rollback操作
相對於put/get/ack操作,rollback操作簡單了很多。所謂rollback,就是client已經get到的資料,沒能消費成功,因此需要進行回滾。回滾操作特別簡單,只需要將getSequence的位置重置為ackSequence,將getMemSize設定為ackMemSize即可。
- publicvoidrollback()throwsCanalStoreException{
- finalReentrantLocklock=this.lock;
- lock.lock();
- try{
- getSequence.set(ackSequence.get());
- getMemSize.set(ackMemSize.get());
- }finally{
- lock.unlock();
- }
- }
4.6 其他方法
除了上述提到的所有方法外,MemoryEventStoreWithBuffer還提供了getFirstPosition()
和getLatestPosition()
方法,分別用於獲取當前佇列中的第一個和最後一個Event的位置資訊。前面已經提到,在CanalServerWithEmbedded中會使用getFirstPosition()方法來獲取CanalEventStore中儲存的第一個Event的位置,而getLatestPosition()只是在一些單元測試中使用到,因此在這裡我們只分析getFirstPosition()方法。
第一條資料通過ackSequence當前值對應的Event來確定,因為更早的Event在ack後都已經被刪除了。相關原始碼如下:
MemoryEventStoreWithBuffer#getFirstPosition
- //獲取第一條資料的position,如果沒有資料返回為null
- publicLogPositiongetFirstPosition()throwsCanalStoreException{
- finalReentrantLocklock=this.lock;
- lock.lock();
- try{
- longfirstSeqeuence=ackSequence.get();
- //1沒有ack過資料,且佇列中有資料
- if(firstSeqeuence==INIT_SQEUENCE&&firstSeqeuence<putSequence.get()){
- //沒有ack過資料,那麼ack為初始值-1,又因為佇列中有資料,因此ack+1,即返回佇列中第一條資料的位置
- Eventevent=entries[getIndex(firstSeqeuence+1)];
- returnCanalEventUtils.createPosition(event,false);
- //2已經ack過資料,但是未追上put操作
- }elseif(firstSeqeuence>INIT_SQEUENCE&&firstSeqeuence<putSequence.get()){
- //返回最後一次ack的位置資料+1
- Eventevent=entries[getIndex(firstSeqeuence+1)];
- returnCanalEventUtils.createPosition(event,true);
- //3已經ack過資料,且已經追上put操作,說明佇列中所有資料都被消費完了
- }elseif(firstSeqeuence>INIT_SQEUENCE&&firstSeqeuence==putSequence.get()){
- //最後一次ack的位置資料,和last為同一條
- Eventevent=entries[getIndex(firstSeqeuence)];
- returnCanalEventUtils.createPosition(event,false);
- //4沒有任何資料,返回null
- }else{
- returnnull;
- }
- }finally{
- lock.unlock();
- }
- }
程式碼邏輯很簡單,唯一需要關注的是,通過CanalEventUtils#createPosition(Event, boolean)方法來計算第一個Event的位置,返回的是一個LogPosition
物件。其中boolean引數用LogPosition內部維護的EntryPosition
的included屬性賦值。在前面get方法原始碼分析時,我們已經看到,當included值為false時,會把當前get位置+1,然後開始獲取Event;當為true時,則直接從當前get位置開始獲取資料。
6.0 filter模組
2018-11-03 01:16:489,17311 Filter模組簡介
filter模組用於對binlog進行過濾。在實際開發中,一個mysql例項中可能會有多個庫,每個庫裡面又會有多個表,可能我們只是想訂閱某個庫中的部分表,這個時候就需要進行過濾。也就是說,parser模組解析出來binlog之後,會進行一次過濾之後,才會儲存到store模組中。
過濾規則的配置既可以在canal服務端進行,也可以在客戶端進行。
1.1 服務端配置
我們在配置一個canal instance時,在instance.properties中有以下兩個配置項:
其中:
canal.instance.filter.regex用於配置白名單,也就是我們希望訂閱哪些庫,哪些表,預設值為.*\\..*,也就是訂閱所有庫,所有表。
canal.instance.filter.black.regex用於配置黑名單,也就是我們不希望訂閱哪些庫,哪些表。沒有預設值,也就是預設黑名單為空。
需要注意的是,在過濾的時候,會先根據白名單進行過濾,再根據黑名單過濾。意味著,如果一張表在白名單和黑名單中都出現了,那麼這張表最終不會被訂閱到,因為白名單通過後,黑名單又將這張表給過濾掉了。
另外一點值得注意的是,過濾規則使用的是perl正則表示式,而不是jdk自帶的正則表示式。意味著filter模組引入了其他依賴,來進行匹配。具體來說,filter模組的pom.xml中包含以下兩個依賴:
- <dependency>
- <groupId>com.googlecode.aviator</groupId>
- <artifactId>aviator</artifactId>
- </dependency>
- <dependency>
- <groupId>oro</groupId>
- <artifactId>oro</artifactId>
- </dependency>
其中:
aviator:是一個開源的、高效能、輕量級的 java 語言實現的表示式求值引擎
oro:全稱為Jakarta ORO,最全面以及優化得最好的正則表示式API之一,Jakarta-ORO庫以前叫做OROMatcher,是由DanielF. Savarese編寫,後來捐贈給了apache Jakarta Project。canal的過濾規則就是通過oro中的Perl5Matcher來進行完成的。
顯然,對於filter模組的原始碼解析,實際上主要變成了對aviator、oro的分析。
這一點,我們可以從filter模組核心介面CanalEventFilter
的實現類中得到驗證。CanalEventFilter介面定義了一個filter方法:
- publicinterfaceCanalEventFilter<T>{
- booleanfilter(Tevent)throwsCanalFilterException;
- }
目前針對CanalEventFilter提供了3個實現類,都是基於開源的java表示式求值引擎Aviator,如下:
提示:這個3個實現都是以Aviater開頭,應該是拼寫錯誤,正確的應該是Aviator。
其中:
-
AviaterELFilter:基於Aviator el表示式的匹配過濾
-
AviaterSimpleFilter:基於Aviator進行tableName簡單過濾計算,不支援正則匹配
-
AviaterRegexFilter
:基於Aviator進行tableName正則匹配的過濾演算法。內部使用到了一個RegexFunction類,這是對Aviator自定義的函式的擴充套件,內部使用到了oro中的Perl5Matcher來進行正則匹配。
需要注意的是,儘管filter模組提供了3個基於Aviator的過濾器實現,但是實際上使用到的只有AviaterRegexFilter。這一點可以在canal-deploy模組提供的xxx-instance.xml配置檔案中得要驗證。以default-instance.xml為例,eventParser這個bean包含以下兩個屬性:
- <beanid="eventParser"class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
- <!--...-->
- <!--解析過濾處理-->
- <propertyname="eventFilter">
- <beanclass="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter">
- <constructor-argindex="0"value="${canal.instance.filter.regex:.*\..*}"/>
- </bean>
- </property>
- <propertyname="eventBlackFilter">
- <beanclass="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter">
- <constructor-argindex="0"value="${canal.instance.filter.black.regex:}"/>
- <constructor-argindex="1"value="false"/>
- </bean>
- </property>
- <!--...-->
- </bean>
其中:
-
eventFilter屬性:使用配置項canal.instance.filter.regex的值進行白名單過濾。
-
eventBlackFilter屬性:使用配置項canal.instance.filter.black.regex進行黑名單過濾。
這兩個屬性的值都是通過一個內部bean的方式進行配置,型別都是AviaterRegexFilter。由於其他兩個型別的CanalEventFilter實現在parser模組中並沒有使用到,因此後文中,我們也只會對AviaterRegexFilter進行分析。
前面提到,parser模組在過濾的時候,會先根據canal.instance.filter.regex進行白名單過濾,再根據canal.instance.filter.black.regex進行黑名單過濾。到這裡,實際上就是先通過eventFilter進行擺明但過濾,通過eventBlackFilter進行黑名單過濾。
parser模組實際上會將eventFilter、eventBlackFilter設定到一個LogEventConvert
物件中,這個物件有2個方法:parseQueryEvent和parseRowsEvent都進行了過濾。以parseRowsEvent方法為例:
com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert#parseRowsEvent(省略部分程式碼片段)
- privateEntryparseRowsEvent(RowsLogEventevent){
- ...
- TableMapLogEventtable=event.getTable();
- Stringfullname=table.getDbName()+"."+table.getTableName();
- //checknamefilter
- if(nameFilter!=null&&!nameFilter.filter(fullname)){
- returnnull;
- }
- if(nameBlackFilter!=null&&nameBlackFilter.filter(fullname)){
- returnnull;
- }
- ...
- }
這裡的nameFilter、nameBlackFilter實際上就是我們設定到parser中的 eventFilter、eventBlackFilter,只不過parser將其設定到LogEventConvert物件中換了一個名字。
可以看到,的確是先使用nameFilter進行白名單過濾,再使用nameBlackFilter進行黑名單過濾。在過濾時,使用dbName+"."+tableName
作為引數,進行過濾。如果被過濾掉了,就返回null。
再次提醒,由於黑名單後過濾,因此如果希望訂閱一個表,一定不要在黑名單中出現。
1.2 客戶端配置
上面提到的都是服務端配置。canal也支援客戶端配置過濾規則。舉例來說,假設一個庫有10張表,一個client希望訂閱其中5張表,另一個client希望訂閱另5張表。此時,服務端可以訂閱10張表,當client來消費的時候,根據client的過濾規則只返回給對應的binlog event。
客戶端指定過濾規則通過client模組中的CanalConnector
的subscribe
方法來進行,subscribe有兩種過載形式,如下:
- //對於第一個subscribe方法,不指定filter,以服務端的filter為準
- voidsubscribe()throwsCanalClientException;
- //指定了filter:
- //如果本次訂閱中filter資訊為空,則直接使用canalserver服務端配置的filter資訊
- //如果本次訂閱中filter資訊不為空,目前會直接替換canalserver服務端配置的filter資訊,以本次提交的為準
- voidsubscribe(Stringfilter)throwsCanalClientException;
通過不同client指定不同的過濾規則,可以達到服務端一份資料供多個client進行訂閱消費的效果。
然而,想法是好的,現實確是殘酷的,由於目前一個canal instance只允許一個client訂閱,因此目前還達不到這種效果。讀者明白這種設計的初衷即可。
最後列出filter模組的目錄結構,這個模組的類相當的少,如下:
到此,filter模組的主要作用已經講解完成。接著應該針對AviaterRegexFilter進行原始碼分析,由於其基於Aviator和oro基礎之上編寫,因此先對Aviator和oro進行介紹。
2 Aviator快速入門
說明,這裡關於Aviator的相關內容直接摘錄自官網:https://github.com/killme2008/aviator,並沒有包含Aviator所有內容,僅僅是就canal內部使用到的一些特性進行講解。
Aviator是一個高效能、輕量級的 java 語言實現的表示式求值引擎, 主要用於各種表示式的動態求值。現在已經有很多開源可用的 java 表示式求值引擎,為什麼還需要 Avaitor 呢?
Aviator的設計目標是輕量級和高效能,相比於Groovy、JRuby的笨重, Aviator非常小, 加上依賴包也才 537K,不算依賴包的話只有 70K; 當然, Aviator的語法是受限的, 它不是一門完整的語言, 而只是語言的一小部分集合。
其次, Aviator的實現思路與其他輕量級的求值器很不相同, 其他求值器一般都是通過解釋的方式執行, 而Aviator則是直接將表示式編譯成 JVM 位元組碼, 交給 JVM 去執行。簡單來說, Aviator的定位是介於 Groovy 這樣的重量級指令碼語言和 IKExpression 這樣的輕量級表示式引擎之間。
Aviator 的特性:
-
支援絕大多數運算操作符,包括算術操作符、關係運算符、邏輯操作符、位運算子、正則匹配操作符(=~)、三元表示式(?:)
-
支援操作符優先順序和括號強制設定優先順序
-
邏輯運算子支援短路運算。
-
支援豐富型別,例如nil、整數和浮點數、字串、正則表示式、日期、變數等,支援自動型別轉換。
-
內建一套強大的常用函式庫
-
可自定義函式,易於擴充套件
-
可過載操作符
-
支援大數運算(BigInteger)和高精度運算(BigDecimal)
-
效能優秀
引入Aviator, 從 3.2.0 版本開始, Aviator 僅支援 JDK 7 及其以上版本。 JDK 6 請使用 3.1.1 這個穩定版本。
- <dependency>
- <groupId>com.googlecode.aviator</groupId>
- <artifactId>aviator</artifactId>
- <version>{version}</version>
- </dependency>
注意:canal 1.0.24 中使用的是Aviator 2.2.1版本。
Aviator的使用都是集中通過com.googlecode.aviator.AviatorEvaluator
這個入口類來處理。在canal提供的AviaterRegexFilter中,僅僅使用到了Aviator部分功能,我們這裡也僅僅就這些功能進行講解。
2.1 編譯表示式
參考:https://github.com/killme2008/aviator/wiki#%E7%BC%96%E8%AF%91%E8%A1%A8%E8%BE%BE%E5%BC%8F
案例:
- publicclassTestAviator{
- publicstaticvoidmain(String[]args){
- //1、定義一個字串表示式
- Stringexpression="a-(b-c)>100";
- //2、對錶達式進行編譯,得到Expression物件例項
- ExpressioncompiledExp=AviatorEvaluator.compile(expression);
- //3、準備計算表示式需要的引數
- Map<String,Object>env=newHashMap<String,Object>();
- env.put("a",100.3);
- env.put("b",45);
- env.put("c",-199.100);
- //4、執行表示式,通過呼叫Expression的execute方法
- Booleanresult=(Boolean)compiledExp.execute(env);
- System.out.println(result);//false
- }
- }
通過compile
方法可以將表示式編譯成Expression
的中間物件, 當要執行表示式的時候傳入env並呼叫Expression的execute方法即可。 表示式中使用了括號來強制優先順序, 這個例子還使用了>用於比較數值大小, 比較運算子!=、==、>、>=、<、<=不僅可以用於數值, 也可以用於String、Pattern、Boolean等等, 甚至是任何使用者傳入的兩個都實現了java.lang.Comparable介面的物件之間。
編譯後的結果你可以自己快取, 也可以交給 Aviator 幫你快取, AviatorEvaluator內部有一個全域性的快取池, 如果你決定快取編譯結果, 可以通過:
- publicstaticExpressioncompile(Stringexpression,booleancached)
將cached設定為true即可, 那麼下次編譯同一個表示式的時候將直接返回上一次編譯的結果。
使快取失效通過以下方法:
- publicstaticvoidinvalidateCache(Stringexpression)
2.2 自定義函式
參考:https://github.com/killme2008/aviator/wiki#%E8%87%AA%E5%AE%9A%E4%B9%89%E5%87%BD%E6%95%B0
Aviator 除了內建的函式之外,還允許使用者自定義函式,只要實現com.googlecode.aviator.runtime.type.AviatorFunction
介面, 並註冊到AviatorEvaluator即可使用. AviatorFunction介面十分龐大, 通常來說你並不需要實現所有的方法, 只要根據你的方法的參 數個數, 繼承AbstractFunction
類並override相應方法即可。
可以看一個例子,我們實現一個add函式來做數值的相加:
- //1、自定義函式AddFunction,繼承AbstractFunction,覆蓋其getName方法和call方法
- classAddFunctionextendsAbstractFunction{
- //1.1getName用於返回函式的名字,之後需要使用這個函式時,達表示需要以add開頭
- publicStringgetName(){
- return"add";
- }
- //1.2在執行計算時,call方法將會被回撥。call方法有多種過載形式,引數可以分為2類:
- //第一類:所有的call方法的第一個引數都是Map型別的env引數。
- //第二類:不同數量的AviatorObject引數。由於在這裡我們的add方法只接受2個引數,
- //所以覆蓋接受2個AviatorObject引數call方法過載形式
- //使用者在執行時,通過"函式名(引數1,引數2,...)"方式執行函式,如:"add(1,2)"
- @Override
- publicAviatorObjectcall(Map<String,Object>env,AviatorObjectarg1,AviatorObjectarg2){
- Numberleft=FunctionUtils.getNumberValue(arg1,env);
- Numberright=FunctionUtils.getNumberValue(arg2,env);
- returnnewAviatorDouble(left.doubleValue()+right.doubleValue());
- }
- }
- publicclassTestAviator{
- publicstaticvoidmain(String[]args){
- //註冊函式
- AviatorEvaluator.addFunction(newAddFunction());
- System.out.println(AviatorEvaluator.execute("add(1,2)"));//3.0
- System.out.println(AviatorEvaluator.execute("add(add(1,2),100)"));//103.0
- }
- }
註冊函式通過AviatorEvaluator.addFunction方法, 移除可以通過removeFunction。另外, FunctionUtils 提供了一些方便引數型別轉換的方法。
3 AviaterRegexFilter原始碼解析
AviaterRegexFilter實現了CanalEventParser介面,主要是實現其filter方法對binlog進行過濾。
首先對AviaterRegexFilter中定義的欄位和構造方法進行介紹:
com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter
- publicclassAviaterRegexFilterimplementsCanalEventFilter<String>{
- //我們的配置的binlog過濾規則可以由多個正則表示式組成,使用逗號”,"進行分割
- privatestaticfinalStringSPLIT=",";
- //將經過逗號",”分割後的過濾規則重新使用|串聯起來
- privatestaticfinalStringPATTERN_SPLIT="|";
- //canal定義的Aviator過濾表示式,使用了regex自定義函式,接受pattern和target兩個引數
- privatestaticfinalStringFILTER_EXPRESSION="regex(pattern,target)";
- //regex自定義函式實現,RegexFunction的getName方法返回regex,call方法接受兩個引數
- privatestaticfinalRegexFunctionregexFunction=newRegexFunction();
- //對自定義表示式進行編譯,得到Expression物件
- privatefinalExpressionexp=AviatorEvaluator.compile(FILTER_EXPRESSION,true);
- static{
- //將自定義函式新增到AviatorEvaluator中
- AviatorEvaluator.addFunction(regexFunction);
- }
- //用於比較兩個字串的大小
- privatestaticfinalComparator<String>COMPARATOR=newStringComparator();
- //使用者設定的過濾規則,需要使用SPLIT進行分割
- finalprivateStringpattern;
- //在沒有指定過濾規則pattern情況下的預設值,例如預設為true,表示使用者不指定過濾規則情況下,總是返回所有的binlogevent
- finalprivatebooleandefaultEmptyValue;
- publicAviaterRegexFilter(Stringpattern){
- this(pattern,true);
- }
- //構造方法
- publicAviaterRegexFilter(Stringpattern,booleandefaultEmptyValue){
- //1給defaultEmptyValue欄位賦值
- this.defaultEmptyValue=defaultEmptyValue;
- //2、給pattern欄位賦值
- //2.1將傳入pattern以逗號",”進行分割,放到list中;如果沒有指定pattern,則list為空,意味著不需要過濾
- List<String>list=null;
- if(StringUtils.isEmpty(pattern)){
- list=newArrayList<String>();
- }else{
- String[]ss=StringUtils.split(pattern,SPLIT);
- list=Arrays.asList(ss);
- }
- //2.2對list中的pattern元素,按照從長到短的排序
- Collections.sort(list,COMPARATOR);
- //2.3對pattern進行頭尾完全匹配
- list=completionPattern(list);
- //2.4將過濾規則重新使用|串聯起來賦值給pattern
- this.pattern=StringUtils.join(list,PATTERN_SPLIT);
- }
- ...
- }
上述程式碼中,2.2 步驟使用了COMPARATOR對list中分割後的pattern進行比較,COMPARATOR的型別是StringComparator,這是定義在AviaterRegexFilter中的一個靜態內部類
- /**
- *修復正則表示式匹配的問題,因為使用了oro的matches,會出現:
- *foo|foot匹配foot出錯,原因是foot匹配了foo之後,會返回foo,但是foo的長度和foot的長度不一樣
- *因此此類對正則表示式進行了從長到短的排序
- */
- privatestaticclassStringComparatorimplementsComparator<String>{
- @Override
- publicintcompare(Stringstr1,Stringstr2){
- if(str1.length()>str2.length()){
- return-1;
- }elseif(str1.length()<str2.length()){
- return1;
- }else{
- return0;
- }
- }
- }
上述程式碼2.3節呼叫completionPattern(list)方法對list中分割後的pattern進行頭尾完全匹配
- /**
- *修復正則表示式匹配的問題,即使按照長度遞減排序,還是會出現以下問題:
- *foooo|f.*t匹配fooooot出錯,原因是fooooot匹配了foooo之後,會將fooo和資料進行匹配,
- *但是foooo的長度和fooooot的長度不一樣,因此此類對正則表示式進行頭尾完全匹配
- */
- privateList<String>completionPattern(List<String>patterns){
- List<String>result=newArrayList<String>();
- for(Stringpattern:patterns){
- StringBufferstringBuffer=newStringBuffer();
- stringBuffer.append("^");
- stringBuffer.append(pattern);
- stringBuffer.append("$");
- result.add(stringBuffer.toString());
- }
- returnresult;
- }
- }
filter方法
AviaterRegexFilter類中最重要的就是filter方法,由這個方法執行過濾,如下:
- //1引數:前面已經分析過parser模組的LogEventConvert中,會將binlogevent的dbName+”."+tableName當做引數過濾
- publicbooleanfilter(Stringfiltered)throwsCanalFilterException{
- //2如果沒有指定匹配規則,返回預設值
- if(StringUtils.isEmpty(pattern)){
- returndefaultEmptyValue;
- }
- //3如果需要過濾的dbName+”.”+tableName是一個空串,返回預設值
- //提示:一些型別的binlogevent,如heartbeat,並不是真正修改資料,這種型別的event是沒有庫名和表名的
- if(StringUtils.isEmpty(filtered)){
- returndefaultEmptyValue;
- }
- //4將傳入的dbName+”."+tableName通過canal自定義的Aviator擴充套件函式RegexFunction進行計算
- Map<String,Object>env=newHashMap<String,Object>();
- env.put("pattern",pattern);
- env.put("target",filtered.toLowerCase());
- return(Boolean)exp.execute(env);
- }
第4步通過exp.execute方法進行過濾判斷,前面已經看到,exp這個Expression例項是通過"regex(pattern,target)"編譯得到。根據前面對AviatorEvaluator的介紹,其應該呼叫一個名字為regex的Aviator自定義函式,這個函式接受2個引數。
RegexFunction
的實現如下所示:
com.alibaba.otter.canal.filter.aviater.RegexFunction
- publicclassRegexFunctionextendsAbstractFunction{
- publicAviatorObjectcall(Map<String,Object>env,AviatorObjectarg1,AviatorObjectarg2){
- Stringpattern=FunctionUtils.getStringValue(arg1,env);
- Stringtext=FunctionUtils.getStringValue(arg2,env);
- Perl5Matchermatcher=newPerl5Matcher();
- booleanisMatch=matcher.matches(text,PatternUtils.getPattern(pattern));
- returnAviatorBoolean.valueOf(isMatch);
- }
- publicStringgetName(){
- return"regex";
- }
- }
可以看到,在這個函式裡面,實際上是根據配置的過濾規則pattern,以及需要過濾的內容text(即dbName+”.”+tableName),通過jarkata-oro中Perl5Matcher
類進行正則表示式匹配。