如何用Flink把資料sink到kafka多個(成百上千)topic中
需求與場景
上游某業務資料量特別大,進入到kafka一個topic中(當然了這個topic的partition數必然多,有人肯定疑問為什麼非要把如此龐大的資料寫入到1個topic裡,歷史留下的問題,現狀就是如此龐大的資料集中在一個topic裡)。這就需要根據一些業務規則把這個大資料量的topic資料分發到多個(成百上千)topic中,以便下游的多個job去消費自己topic的資料,這樣上下游之間的耦合性就降低了,也讓下游的job輕鬆了很多,下游的job只處理屬於自己的資料,避免成百上千的job都去消費那個大資料量的topic。資料被分發之後再讓下游job去處理 對網路頻寬、程式效能、演算法複雜性都有好處。
這樣一來就需要 這麼一個分發程式,把上下游job連線起來。
分析與思考
Flink中有connect運算元,可以連線2個流,在這裡1個就是上面資料量龐大的業務資料流,另外1個就是規則流(或者叫做配置流,也就是決定根據什麼樣的規則分發業務資料)
但是問題來了,根據規則分發好了,如何把這些資料sink到kafka多個(成百上千)topic中呢?
首先想到的就是新增多個sink,每分發到一個topic,就多新增1個addSink操作,這對於如果只是分發到2、3個topic適用的,我看了一下專案中有時候需要把資料sink到2個topic中,同事中就有人添加了2個sink,完全ok,但是在這裡要分發到幾十個、成百上千個topic,就肯定不現實了,不需要解釋吧。
sink到kafka中,其實本質上就是用
KafkaProducer
往kafka寫資料,那麼不知道有沒有想起來,用KafkaProducer
寫資料的時候api是怎樣的,public Future<RecordMetadata> send(ProducerRecord<K, V> record);
顯然這裡需要一個ProducerRecord物件,再看如何例項化ProducerRecord
物件,public ProducerRecord(String topic, V value)
, 也就是說每一個message都指定topic,標明是寫到哪一個topic的,而不必說 我們要寫入10個不同的topic中,我們就一定new 10 個 KafkaProducer到上面這一步,如果懂的人就會豁然開朗了,我本來想著可能需要稍微改改flink-connector-kafka實現,讓我驚喜的是flink-connector-kafka已經留有了介面,只要實現
KeyedSerializationSchema
這個介面的String getTargetTopic(T element);
就行
程式碼實現
先看一下KeyedSerializationSchema
介面的定義,我們知道kafka中儲存的都是byte[],所以由我們自定義序列化key、value
/**
* The serialization schema describes how to turn a data object into a different serialized
* representation. Most data sinks (for example Apache Kafka) require the data to be handed
* to them in a specific format (for example as byte strings).
*
* @param <T> The type to be serialized.
*/
@PublicEvolving
public interface KeyedSerializationSchema<T> extends Serializable {
/**
* Serializes the key of the incoming element to a byte array
* This method might return null if no key is available.
*
* @param element The incoming element to be serialized
* @return the key of the element as a byte array
*/
byte[] serializeKey(T element);
/**
* Serializes the value of the incoming element to a byte array.
*
* @param element The incoming element to be serialized
* @return the value of the element as a byte array
*/
byte[] serializeValue(T element);
/**
* Optional method to determine the target topic for the element.
*
* @param element Incoming element to determine the target topic from
* @return null or the target topic
*/
String getTargetTopic(T element);
}
重點來了,實現這個String getTargetTopic(T element);
就可以決定這個message寫入到哪個topic裡。
於是 我們可以這麼做,拿到業務資料(我們用的是json格式),然後根據規則分發的時候,就在這條json格式的業務資料裡新增一個寫到哪個topic的欄位,比如說叫@topic
,
然後我們實現getTargetTopic()
方法的時候,從業務資料中取出@topic
欄位就行了。
實現如下(這裡我是用scala寫的,java類似):
class OverridingTopicSchema extends KeyedSerializationSchema[Map[String, Any]] {
override def serializeKey(element: Map[String, Any]): Array[Byte] = null
override def serializeValue(element: Map[String, Any]): Array[Byte] = JsonTool.encode(element) //這裡用JsonTool指代json序列化的工具類
/**
* kafka message value 根據 @topic欄位 決定 往哪個topic寫
* @param element
* @return
*/
override def getTargetTopic(element: Map[String, Any]): String = {
if (element != null && element.contains(“@topic”)) {
element(“@topic”).toString
} else null
}
}
之後在new FlinkKafkaProducer
物件的時候 把上面我們實現的這個OverridingTopicSchema
傳進去就行了。
public FlinkKafkaProducer(
String defaultTopicId, // 如果message沒有指定寫往哪個topic,就寫入這個預設的topic
KeyedSerializationSchema<IN> serializationSchema,//傳入我們自定義的OverridingTopicSchema
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
//....
}
至此,我們只需要把上面new 出來的FlinkKafkaProducer
新增到addSink中就能實現把資料sink到kafka多個(成百上千)topic中。
下面簡單追蹤一下FlinkKafkaProducer
原始碼,看看flink-connector-kafka是如何將我們自定義的KeyedSerializationSchema
作用於最終的ProducerRecord
/** 這個是使用者可自定義的序列化實現
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
*/
private final KeyedSerializationSchema<IN> schema;
@Override
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
checkErroneous();
// 呼叫我們自己的實現的schema序列化message中的key
byte[] serializedKey = schema.serializeKey(next);
// 呼叫我們自己的實現的schema序列化message中的value
byte[] serializedValue = schema.serializeValue(next);
// 呼叫我們自己的實現的schema取出寫往哪個topic
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
// 如果沒有指定寫往哪個topic,就寫往預設的topic
// 這個預設的topic是我們new FlinkKafkaProducer時候作為第一個構造引數傳入(見上面的註釋)
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic, // 這裡看到了我們上面一開始分析的ProducerRecord
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}