日誌服務Flink Connector《支援Exactly Once》
摘要: Flink log connector是阿里雲日誌服務推出的,用於對接Flink的工具,包含兩塊,分別是消費者和生產者,消費者用於從日誌服務中讀資料,支援exactly once語義,生產者用於將資料寫到日誌服務中,該Connector隱藏了日誌服務的一些概念,比如Shard的分裂合併等,使用者在使用時只需要專注在自己的業務邏輯即可。
阿里雲日誌服務是針對實時資料一站式服務,使用者只需要將精力集中在分析上,過程中資料採集、對接各種儲存計算、資料索引和查詢等瑣碎工作等都可以交給日誌服務完成。
日誌服務中最基礎的功能是LogHub,支援資料實時採集與消費,實時消費家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。
Flink Connector
Flink log connector是阿里雲日誌服務提供的,用於對接flink的工具,包括兩部分,消費者(Consumer)和生產者(Producer)。
消費者用於從日誌服務中讀取資料,支援exactly once語義,支援shard負載均衡.
生產者用於將資料寫入日誌服務,使用connector時,需要在專案中新增maven依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId >
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.3</version>
</dependency>
<dependency >
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.10</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
<version>0.1.8</version>
</dependency>
程式碼:Github
用法
請參考日誌服務文件,正確建立Logstore。
如果使用子賬號訪問,請確認正確設定了LogStore的RAM策略。參考授權RAM子使用者訪問日誌服務資源。
1. Log Consumer
在Connector中, 類FlinkLogConsumer提供了訂閱日誌服務中某一個LogStore的能力,實現了exactly once語義,在使用時,使用者無需關心LogStore中shard數
量的變化,consumer會自動感知。
flink中每一個子任務負責消費LogStore中部分shard,如果LogStore中shard發生split或者merge,子任務消費的shard也會隨之改變。
1.1 配置啟動引數
Properties configProps = new Properties();
// 設定訪問日誌服務的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 設定訪問ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 設定日誌服務的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 設定日誌服務的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 設定消費日誌服務起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 設定日誌服務的訊息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(
new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。
注意,flink stream的子任務數量和日誌服務LogStore中的shard數量是獨立的,如果shard數量多於子任務數量,每個子任務不重複的消費多個shard,如果少於,
那麼部分子任務就會空閒,等到新的shard產生。
1.2 設定消費起始位置
Flink log consumer支援設定shard的消費起始位置,通過設定屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定製消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:
Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的資料開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的資料開始消費。
UnixTimestamp: 一個整型數值的字串,用1970-01-01到現在的秒數表示, 含義是消費shard中這個時間點之後的資料。
三種取值舉例如下:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
1.3 監控:消費進度(可選)
Flink log consumer支援設定消費進度監控,所謂消費進度就是獲取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
文件消費組-檢視狀態,消費組-監控報警
。
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);
注意上面程式碼是可選的,如果設定了,consumer會首先建立consumerGroup,如果已經存在,則什麼都不做,consumer中的snapshot會自動同步到日誌服務的consumerGroup中,使用者可以在日誌服務的控制檯檢視consumer的消費進度。
1.4 容災和exactly once語義支援
當開啟Flink的checkpointing功能時,Flink log consumer會週期性的將每個shard的消費進度儲存起來,當作業失敗時,flink會恢復log consumer,並
從儲存的最新的checkpoint開始消費。
寫checkpoint的週期定義了當發生失敗時,最多多少的資料會被回溯,也就是重新消費,使用程式碼如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟flink exactly once語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s儲存一次checkpoint
env.enableCheckpointing(5000);
更多Flink checkpoint的細節請參考Flink官方文件Checkpoints。
1.5 補充材料:關聯 API與許可權設定
Flink log consumer 會用到的阿里雲日誌服務介面如下:
GetCursorOrData
用於從shard中拉資料, 注意頻繁的呼叫該介面可能會導致資料超過日誌服務的shard quota, 可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制介面呼叫的時間間隔和每次呼叫拉取的日誌數量,shard的quota參考文章[shard簡介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
ListShards
用於獲取logStore中所有的shard列表,獲取shard狀態等.如果您的shard經常發生分裂合併,可以通過調整介面的呼叫週期來及時發現shard的變化。
// 設定每30s呼叫一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
CreateConsumerGroup
該介面呼叫只有當設定消費進度監控時才會發生,功能是建立consumerGroup,用於同步checkpoint。
ConsumerGroupUpdateCheckPoint
該介面使用者將flink的snapshot同步到日誌服務的consumerGroup中。
子使用者使用Flink log consumer需要授權如下幾個RAM Policy:
2. Log Producer
FlinkLogProducer 用於將資料寫到阿里雲日誌服務中。
注意producer只支援Flink at-least-once語義,這就意味著在發生作業失敗的情況下,寫入日誌服務中的資料有可能會重複,但是絕對不會丟失。
用法示例如下,我們將模擬產生的字串寫入日誌服務:
// 將資料序列化成日誌服務的資料格式
class SimpleLogSerializer implements LogSerializationSchema<String> {
public RawLogGroup serialize(String element) {
RawLogGroup rlg = new RawLogGroup();
RawLog rl = new RawLog();
rl.setTime((int)(System.currentTimeMillis() / 1000));
rl.addContent("message", element);
rlg.addLog(rl);
return rlg;
}
}
public class ProducerSample {
public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
public static String sAccessKeyId = "";
public static String sAccessKey = "";
public static String sProject = "ali-cn-hangzhou-sls-admin";
public static String sLogstore = "test-flink-producer";
private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(3);
DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
Properties configProps = new Properties();
// 設定訪問日誌服務的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
// 設定訪問日誌服務的ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
// 設定日誌寫入的日誌服務project
configProps.put(ConfigConstants.LOG_PROJECT, sProject);
// 設定日誌寫入的日誌服務logStore
configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
simpleStringStream.addSink(logProducer);
env.execute("flink log producer");
}
// 模擬產生日誌
public static class EventsGenerator implements SourceFunction<String> {
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
long seq = 0;
while (running) {
Thread.sleep(10);
ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
}
}
@Override
public void cancel() {
running = false;
}
}
}
2.1 初始化
Producer初始化主要需要做兩件事情:
初始化配置引數Properties, 這一步和Consumer類似, Producer有一些定製的引數,一般情況下使用預設值即可,特殊場景可以考慮定製:
// 用於傳送資料的io執行緒的數量,預設是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 該值定義日誌資料被快取傳送的時間,預設是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 快取傳送的包中日誌的數量,預設是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 快取傳送的包的大小,預設是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作業可以使用的記憶體總的大小,預設是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述引數不是必選引數,使用者可以不設定,直接使用預設值。
過載LogSerializationSchema,定義將資料序列化成RawLogGroup的方法。
RawLogGroup是log的集合,每個欄位的含義可以參考文件[日誌資料模型](https://help.aliyun.com/document_detail/29054.html)。
如果使用者需要使用日誌服務的shardHashKey功能,指定資料寫到某一個shard中,可以使用LogPartitioner產生資料的hashKey,用法例子如下:
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {
// 生成32位hash值
public String getHashKey(String element) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(element.getBytes());
String hash = new BigInteger(1, md.digest()).toString(16);
while(hash.length() < 32) hash = "0" + hash;
return hash;
} catch (NoSuchAlgorithmException e) {
}
return "0000000000000000000000000000000000000000000000000000000000000000";
}
});
注意LogPartitioner是可選的,不設定情況下, 資料會隨機寫入某一個shard。
2.2 許可權設定:RAM Policy
Producer依賴日誌服務的API寫資料,如下:
log:PostLogStoreLogs
log:ListShards
當RAM子使用者使用Producer時,需要對上述兩個API進行授權: