MaxCompute(原ODPS) 事件(Event)機制
免費開通大數據服務:https://www.aliyun.com/product/odps
什麽是MaxCompute
大數據計算服務(MaxCompute,原名ODPS)是一種快速、完全托管的TB/PB級數據倉庫解決方案。MaxCompute向用戶提供了完善的數據導入方案以及多種經典的分布式計算模型,能夠更快速的解決用戶海量數據計算問題,有效降低企業成本,並保障數據安全。
什麽是 MaxCompute事件機制
MaxCompute event 用於監控表和實例等MaxCompute資源(目前只用於監控表)。當表狀態發生變化時,MaxCompute 會向預先註冊(訂閱)的 uri 發送信息。Event通知只有在訂閱Event之後才能收到。每個project中可以訂閱一個或多個Event。Event是用戶的數據,同表數據一樣,創建或修改時都需要有這個Project的操作權限。關於Event的Restful Api,在文章裏有介紹。
為什麽需要 MaxCompute 事件機制
考慮以下場景:當一個用戶 A 關心某一個表 T 的操作(創建/刪除/插入/修改/...)時,如果表 T 不是用戶 A 創建的,那麽用戶 A 可以采用什麽方法感知這個操作?一個方法是主動輪詢這個表是否做了某個操作,但是缺點是不言而喻的。另一個方法是,註冊一個回調,當表被操作時,被動接受通知。用這種方法可以使用戶邏輯不必輪詢和等待對表的操作。
MaxCompute Event機制就是第二種方法的實現。
在實際的生產中,對以上應用場景有大量的需求,並已經形成了對MaxCompute Event豐富的應用,例如:
數據地圖: 訂閱了一些 project 的 Event,並根據 Event 通知展示這些 project 中表的元數據。
跨集群復制: 監聽 Event 通知以復制相應的表。
螞蟻金服: 依賴事件通知機制進行工作流管理,統計,授權等工作。 事實上,每個 project 都有大量用戶訂閱了所屬project的表以及其它project表的事件通知。
MaxCompute 事件機制是怎樣實現的
本節首先將 MaxCompute 事件機制 作為一個黑盒,從用戶的角度介紹其功能和使用方法。而後以此為切入點,深入剖析 MaxCompute 事件機制的內部機理。最後,提出一些對當前事件機制的思考。
訂閱(註冊)一個事件 & 事件通知
在網絡編程中,為了減輕多線程的壓力,往往使用事件通知驅動的異步編程。如,libevent[2]。使用這個庫編寫一個服務器程序,可以這樣做:
void on_accept(int sock, short event, void* arg);
int main(int argc, char* argv[])
{
// create socket s
struct sockadddr_in addrin;
int s = socket(AF_INET, SOCK_STREAM, 0);
BOOL bReuseaddr=TRUE;
setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));
memset(&addrin, 0, sizeof(addrin));
addrin.sin_family = AF_INET;
addrin.sin_port = htons(PORT);
addrin.sin_addr.s_addr = INADDR_ANY;
bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));
listen(s, BACKLOG);
// 創建事件池 event base
struct event_base* eb = event_base_new();
// 創建事件 & 綁定回調
struct event e;
event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);
// 註冊事件
event_base_set(eb, &e);
event_add(&e, NULL);
// 啟動事件派發
event_base_dispatch(eb);
return 0;
}
抽取出上面事件通知邏輯的主線:創建事件池,創建一個 event 並綁定回調函數, 把 event 註冊到事件池並啟動事件派發器。
在這個過程中,事件生產者是socket(嚴格說是綁定在這個socket上的事件多路復用接口,如epoll),事件中轉者是libevent中的事件池(event base)和事件派發器,事件消費者是事件處理回調函數。
同樣的過程適用於MaxCompute event。事件池和派發器不需要用戶創建。用戶首先創建一個事件,然後綁定回調處理邏輯,最後把事件註冊到事件池。代碼如下:
public class TestOdpsEvent {
/**
* 創建事件方法
*/
static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {
Event event = new Event();
event.setName(eventName); // 指定事件名稱
event.setComment(comment); // 事件註釋
event.setType(Event.SourceType.TABLE); // 指定事件類型,目前支持 TABLE
Event.Config config = new Event.Config();
config.setName("source");
config.setValue(tableName); // 指定事件源(即 表名). "*" 表示所有表.
event.setConfig(config);
event.setUri(new URI(callbackUri)); // 指定了一個回調地址
return event;
}
public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {
Account account = new AliyunAccount("xxxxxx", "xxxxxx");
Odps odps = new Odps(account);
String odps_endpoint = "http://xxxx/xxx";
odps.setEndpoint(odps_endpoint);
odps.setDefaultProject("project1");
InternalOdps iodps = new InternalOdps(odps);
// 創建事件 & 綁定回調
String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint
Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event");
// 註冊事件
iodps.events().create(e);
// 查看已創建事件
Iterator<Event> events = iodps.events().iterator();
while(events.hasNext()) {
Event event1 = events.next();
System.out.println("Event found: " + event1.getName());
System.out.println("Event uri: " + event1.getUri());
// iodps.events().delete(event1.getName()); // 刪除事件
}
}
}
在上面的代碼中,指定了一個回調地址。當表發生變化時,就會通知這個回調地址。用戶根據在這個回調地址接收到事件通知,使用相應的處理邏輯處理。事件回調地址作為事件處理邏輯入口,支持多種協議,包括但不限於kuafu, http, https等。與libevent不同的是,MaxCompute event的生產者,中轉者和消費者可以位於不同網絡區域。在用戶註冊事件之後,MaxCompute event機制會在該事件發生後立即通知用戶註冊的回調地址。
剖析 MaxCompute 事件機制
圖3-1的三個部分分別表示了註冊事件,轉發通知,刪除事件的過程。MessageService是 MaxCompute 內部消息服務,作用是轉發事件通知到用戶註冊的回調地址。為方便理解,把 Create topic, Create subscription, Add endpoint 看作註冊事件在消息服務層的三個操作。事件機制在消息服務層具體的實現將在後邊介紹。
圖3-1: 事件創建,轉發,刪除
在圖3-1註冊事件的過程中,用戶的請求由 OdpsWorker 的 createEventHandler 處理。createEventHandler 依次檢查相應的 MessageService topic,subscription,endpoint 是否存在,如果不存在,創建。
在圖3-1刪除事件的過程相對簡單,用戶的請求由 OdpsWorker 的 deleteEventHandler 處理。deleteEventHandler 直接刪除相應的 MessageService subscription。
在圖3-1轉發事件通知的過程中,事件的生產者主要是ddl task(事實上,由於歷史原因,還有HiveServer,CREATETABLE 事件從這裏發出),當執行對表的meta相關的操作時,就會觸發ddl task。如drop table, add partition, insert into partition等。ddl task 會發送相應操作的事件通知。事件通知發送給事件中轉者——消息服務。消息服務將這個事件通知發送給相應事件綁定的回調地址。
消息服務作為事件中轉者,主要完成如下功能:
1)作為事件池維護不同事件和回調地址的對應關系(一個事件對應一個或多個回調地址)
2)作為事件派發器根據事件通知匹配相應事件並將該事件通知轉發到對應事件的各個回調地址。
目前不同的事件依據兩個屬性區分:project名和事件源。事件源目前是表名。在ddl task發出的事件通知中,包含了這兩個關鍵信息。消息服務根據這兩個信息匹配相應事件。在介紹消息服務匹配的實現方式時,首先需要了解 MaxCompute消息服務 的基本概念(為便於理解,本文簡化了消息服務的一些概念,如隱藏了partition概念。在文章[3]中,具體介紹了消息服務的設計和實現)。如圖3-2:
圖3-2: 消息服務基本概念
MaxCompute消息服務包含四個基本概念:topic, subscription, filter, endpoint。消息服務使用了典型的發布訂閱模型。用戶可以創建topic。創建一個或多個subscription(包含一個或多個endpoint)訂閱這個topic。消息發布者向topic發送消息。該消息被轉發到該topic的所有filter匹配的subscription的所有的endpoint。其中,topic的創建者,subscription的創建者,消息的發送者,以及消息的接收者可以是不同的用戶。在創建subscription時,需要指定filter matcher。在消息發送時需要指定filter。當某條消息發送到某個topic時,消息中的filter需要和這個topic的各個subscription的filter matcher匹配,如果匹配成功,將這個消息的一個副本發送給這個subscription的所有endpoint,否則不發送給它們,然後繼續匹配其他的subscription。filter和filter_matcher的示例和匹配規則如下:
filter_matcher | filter | is matched |
"" | "k=v" | yes. If filter_matcher is "", it will match forever. |
"k=v" | "k=v" | yes |
"k=v" | "k=v1" | no |
"k1=v" | "k=v" | no |
"k=v1|v2" | "k=v1" | yes |
"k=v1|v2" | "k=v2" | yes |
"k=v1|v2" | "k=v1|v2" | no. filter's value is 'v1|v2', not 'v1' or 'v2'. |
"k1=v1,k2=v2" | "k1=v1,k2=v2" | yes |
"k1=v1" | "k1=v1,k2=v2" | yes |
"k1=v1,k2=v2" | "k1=v1" | no |
"k=v" | "" | no |
"" | "" | yes. If filter is "", filter_matcher will never hit except its value is "" |
消息服務的這個機制,可以實現上述事件中轉者的功能。將一個事件表達為一個subscription,將一個事件通知表達為一條消息,每個endpoint記錄一個回調地址。每個project對應一個topic,用filter區分事件源。當一個事件通知產生之後,會被發送到產生通知的project所在的topic上。然後,經過匹配,轉發所有的endpoint對應回調地址上。事件通知消息體示例如下:
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>[email protected]</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>CREATETABLE</Reason>
<TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>
<Properties/>
<OdpsMessagerId>1</OdpsMessagerId>
<OdpsMessagerTime>1474208492</OdpsMessagerTime>
</Notification>
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>[email protected]</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>ADDPARTITION</Reason>
<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
<Properties>
<Property>
<Name>Name</Name>
<Value>ds=ds1/pt=pt1</Value>
</Property>
</Properties>
<OdpsMessagerId>4</OdpsMessagerId>
<OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>
當用戶訂閱了 Project a_2_test_event 的 Table "backup_partition" 的事件後,當發生對這個表的 CREATETABLE 和 ADDPARTITION 操作後,會接收到上面的兩個事件通知。每個事件通知是一個 xml 格式的消息。SourceType 表示訂閱的是表的事件通知還是其它類型資源的事件通知(目前只支持表)。SourceName 表示訂閱的表的名字。Reason 表示在該表上發生的操作,上例中分別是創建表的操作和增加分區的操作(在 附錄 中列舉了更多的操作類型)。Properties 中會有一些附加的通知屬性,常用來指出操作發生在表的哪個 parition 上。OdpsMessagerId 在一個 Project 的所有表中是唯一的。OdpsMessagerTime 是這條通知產生的時刻。
在MaxCompute線上服務環境中,每一個project p1,就會對應一個名為 SQL_p1的topic(因為歷史原因hardcode了前綴SQL_,不過前綴是什麽無所謂,只要可以區分事件機制的topic和其它應用中的topic就好),這個 topic 在第一次註冊事件的時候自動創建(也可以在創建project時手動創建)。p1的所有事件通知都會發到這個topic上。這個topic在其對應的project刪除時被刪除。
MaxCompute消息服務為事件機制提供了對事件通知的持久化,保序,failover的功能,盡最大努力保證消息不丟,但是依然不能保證絕對不丟。下面分析事件機制可能出現的丟消息情況:事件生產者失敗,消息服務失敗,事件接收者失敗,消息服務熱升級。
1) 事件生產者失敗:在事件通知到達消息服務之前,存在事件通知生產者失敗的可能。具體的消息丟失概率取決於事件生產者的持久化,failover能力以及重試機制。
2) 消息服務失敗:消息服務失敗包括兩種情況:消息到達消息服務前失敗和消息到達之後失敗。如果消息到達之前失敗,那麽消息服務提供的message client會重試3次,每次間隔5毫秒。如果事件成功地發送到消息服務,消息會首先被持久化。在消息服務中的一條消息只有滿足下列兩種情況才會被刪除:a. 消息發送成功;b. 消息發送失敗且超過重試次數(目前重試3600次,每次間隔60秒)。可以看到,事件(消息)丟失最大的風險在於發送到達消息服務之前的一段時間。
3) 事件接收者失敗:如果接收者失敗,且在獲得事件通知之後,處理事件通知之前,消息服務不提供接口使接收者重獲這條消息。當然對於這個問題,還有另一種解決方法,就是使用類似kafka的消息服務模型作為中轉者,可以保證事件通知更強大的可靠性。kafka模型[4]不會主動推送消息,僅僅對消息做持久化以實現高吞吐和高可靠。消息訂閱者需要給定消息id的範圍從某個topic的partition拉消息。當訂閱者失敗,希望重新獲得歷史的消息時,只要給定消息id的範圍,如果這個範圍內的消息沒有過期,就可以被重新獲得。但是kafka模型使用拉消息的模式不具備完整的事件派發器功能,也就不能支持現在odps需要的異步事件通知編程方式。而事實上,MaxCompute消息服務設計的出發點,就是MaxCompute事件通知機制(在沒有消息服務之前,MaxCompute worker履行著消息服務的職責)。
4)消息服務熱升級:雖然說是熱升級,但是新老服務之間切換也是需要時間的。在線上這個時間最誇張的一次達到了4個多小時(所有topic全部切換完成的時間間隔)。而在切換的過程中,新老消息服務中處於切換中間狀態的 topic 是拒絕服務的(切換完成的 topic 可以服務)。
總之,MaxCompute事件通知機制提供了一定程度的高可用保證,但是還沒有把丟消息的概率降低為0,其最大風險在於消息服務不服務。而此時,消息服務上某個 topic 丟失消息的數量和該 topic 不服務的時間成正比。
總結
MaxCompute事件機制給用戶監聽資源的變化帶來了很大的便利。它借鑒了通用的事件異步編程模型,提供了友好的用戶接口,支持了線上數據地圖,跨集群復制等眾多服務。但是,依然有不足之處,例如:
1) 事件監聽(訂閱/註冊)的粒度粗且不可定制:我們曾經接到用戶的需求,想監聽一個表的 CREATETABLE 事件,但是現有的機制只支持監聽到表的級別,這樣用戶就不得不自己過濾這個表的各種事件。
2) 事件機制的可靠性需要進一步提高:曾經出現過熱升級切換消息服務4個小時的情況,原因是其中一個 topic 向某個 endpoint 發送消息卡在了發送那裏一直無法退出,造成該 topic 上丟失大量消息。
3) 消息服務的 生產者qps(生產環境接收消息1000-2000),消費者 qps(消費極限qps未測過,因其取決於) 與 開源消息服務如 kafka 生產者 qps (50,000),消費者 qps (22,000) 依然有一定差距[4]。
對於當前希望使用MaxCompute消息服務的用戶,最好確保滿足以下條件:
1) 允許丟失少量的消息通知,因為的確存在小概率丟消息的可能;
2) 事件處理系統具有一定的事件處理能力,接受事件qps最好可以達到500以上。
3) 不用的事件(回調uri發不通且不再使用)請刪除,否則會在消息服務中留下永久性垃圾,造成消息在pangu中大量堆積,因為消息服務無法判斷用戶的事件是否需要刪除!!!
解決上述的問題目前依然有一些挑戰,但是我們會不斷改進和完善事件機制的各項功能,減小事件丟失率,細化事件訂閱粒度,優化用戶體驗。
參考資料
[1] Event restful api
[2] Libevent: http://libevent.org
[3] Odps Message Service
[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.
附錄
MaxCompute事件機制事件類型列表
在觸發 DDL 時,Event 會向預先註冊的 url 發送 POST 請求,消息體格式如下
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>[email protected]</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>ADDPARTITION</Reason>
<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
<Properties>
<Property>
<Name>Name</Name>
<Value>ds=ds1/pt=pt1</Value>
</Property>
</Properties>
<OdpsMessagerId>4</OdpsMessagerId>
<OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>
其中:
Reason 可能取值 | 事件生產者 |
CREATETABLE | hiveserver |
DROPTABLE | ddltask |
ALTERTABLE | ddltask |
ADDPARTITION | ddltask |
DROPPARTITION | ddltask |
ALTERPARTITION | ddltask |
INSERTOVERWRITETABLE | ddltask |
INSERTINTOTABLE | ddltask |
INSERTOVERWRITEPARTITION | ddltask |
INSERTINTOPARTITION | ddltask |
MERGETABLE | ddltask |
MERGEPARTITION | ddltask |
ALTERVOLUMEPARTITION | ddltask |
ADDVOLUMEPARTITION | ddltask |
SourceType 可能取值:Table
使用限制
1) 目前只有 Project Owner 可以創建 event,無法授權給其他人創建 event
2) 接收 post 信息的 url 應返回 http code 200,server 端 post 時並不支持如 302 這樣的跳轉。
原文鏈接
MaxCompute(原ODPS) 事件(Event)機制