1. 程式人生 > >如何用Flink把資料sink到kafka多個(成百上千)topic中

如何用Flink把資料sink到kafka多個(成百上千)topic中

需求與場景

上游某業務資料量特別大,進入到kafka一個topic中(當然了這個topic的partition數必然多,有人肯定疑問為什麼非要把如此龐大的資料寫入到1個topic裡,歷史留下的問題,現狀就是如此龐大的資料集中在一個topic裡)。這就需要根據一些業務規則把這個大資料量的topic資料分發到多個(成百上千)topic中,以便下游的多個job去消費自己topic的資料,這樣上下游之間的耦合性就降低了,也讓下游的job輕鬆了很多,下游的job只處理屬於自己的資料,避免成百上千的job都去消費那個大資料量的topic。資料被分發之後再讓下游job去處理 對網路頻寬、程式效能、演算法複雜性都有好處。

這樣一來就需要 這麼一個分發程式,把上下游job連線起來。

分析與思考

  1. Flink中有connect運算元,可以連線2個流,在這裡1個就是上面資料量龐大的業務資料流,另外1個就是規則流(或者叫做配置流,也就是決定根據什麼樣的規則分發業務資料)

  2. 但是問題來了,根據規則分發好了,如何把這些資料sink到kafka多個(成百上千)topic中呢?

  3. 首先想到的就是新增多個sink,每分發到一個topic,就多新增1個addSink操作,這對於如果只是分發到2、3個topic適用的,我看了一下專案中有時候需要把資料sink到2個topic中,同事中就有人添加了2個sink,完全ok,但是在這裡要分發到幾十個、成百上千個topic,就肯定不現實了,不需要解釋吧。

  4. sink到kafka中,其實本質上就是用KafkaProducer往kafka寫資料,那麼不知道有沒有想起來,用KafkaProducer寫資料的時候api是怎樣的,public Future<RecordMetadata> send(ProducerRecord<K, V> record); 顯然這裡需要一個ProducerRecord物件,再看如何例項化ProducerRecord物件,public ProducerRecord(String topic, V value), 也就是說每一個message都指定topic,標明是寫到哪一個topic的,而不必說 我們要寫入10個不同的topic中,我們就一定new 10 個 KafkaProducer

  5. 到上面這一步,如果懂的人就會豁然開朗了,我本來想著可能需要稍微改改flink-connector-kafka實現,讓我驚喜的是flink-connector-kafka已經留有了介面,只要實現KeyedSerializationSchema這個介面的String getTargetTopic(T element);就行

程式碼實現

先看一下KeyedSerializationSchema介面的定義,我們知道kafka中儲存的都是byte[],所以由我們自定義序列化key、value

/**
 * The serialization schema describes how to turn a data object into a different serialized
 * representation. Most data sinks (for example Apache Kafka) require the data to be handed
 * to them in a specific format (for example as byte strings).
 *
 * @param <T> The type to be serialized.
 */
@PublicEvolving
public interface KeyedSerializationSchema<T> extends Serializable {

    /**
     * Serializes the key of the incoming element to a byte array
     * This method might return null if no key is available.
     *
     * @param element The incoming element to be serialized
     * @return the key of the element as a byte array
     */
    byte[] serializeKey(T element);

    /**
     * Serializes the value of the incoming element to a byte array.
     *
     * @param element The incoming element to be serialized
     * @return the value of the element as a byte array
     */
    byte[] serializeValue(T element);

    /**
     * Optional method to determine the target topic for the element.
     *
     * @param element Incoming element to determine the target topic from
     * @return null or the target topic
     */
    String getTargetTopic(T element);
}

重點來了,實現這個String getTargetTopic(T element);就可以決定這個message寫入到哪個topic裡。

於是 我們可以這麼做,拿到業務資料(我們用的是json格式),然後根據規則分發的時候,就在這條json格式的業務資料裡新增一個寫到哪個topic的欄位,比如說叫@topic
然後我們實現getTargetTopic()方法的時候,從業務資料中取出@topic欄位就行了。

實現如下(這裡我是用scala寫的,java類似):

class OverridingTopicSchema extends KeyedSerializationSchema[Map[String, Any]] {

  override def serializeKey(element: Map[String, Any]): Array[Byte] = null

  override def serializeValue(element: Map[String, Any]): Array[Byte] = JsonTool.encode(element) //這裡用JsonTool指代json序列化的工具類

  /**
    * kafka message value 根據 @topic欄位 決定 往哪個topic寫
    * @param element
    * @return
    */
  override def getTargetTopic(element: Map[String, Any]): String = {
    if (element != null && element.contains(“@topic”)) {
      element(“@topic”).toString
    } else null
  }
}

之後在new FlinkKafkaProducer物件的時候 把上面我們實現的這個OverridingTopicSchema傳進去就行了。

public FlinkKafkaProducer(
        String defaultTopicId,  // 如果message沒有指定寫往哪個topic,就寫入這個預設的topic
        KeyedSerializationSchema<IN> serializationSchema,//傳入我們自定義的OverridingTopicSchema
        Properties producerConfig,
        Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
        FlinkKafkaProducer.Semantic semantic,
        int kafkaProducersPoolSize) {
                    //....
}

至此,我們只需要把上面new 出來的FlinkKafkaProducer新增到addSink中就能實現把資料sink到kafka多個(成百上千)topic中。

下面簡單追蹤一下FlinkKafkaProducer原始碼,看看flink-connector-kafka是如何將我們自定義的KeyedSerializationSchema作用於最終的ProducerRecord

        /**  這個是使用者可自定義的序列化實現
     * (Serializable) SerializationSchema for turning objects used with Flink into.
     * byte[] for Kafka.
     */
    private final KeyedSerializationSchema<IN> schema;

        @Override
    public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
        checkErroneous();
// 呼叫我們自己的實現的schema序列化message中的key
        byte[] serializedKey = schema.serializeKey(next);

// 呼叫我們自己的實現的schema序列化message中的value
        byte[] serializedValue = schema.serializeValue(next);
                
// 呼叫我們自己的實現的schema取出寫往哪個topic
        String targetTopic = schema.getTargetTopic(next);

        if (targetTopic == null) {
// 如果沒有指定寫往哪個topic,就寫往預設的topic
// 這個預設的topic是我們new  FlinkKafkaProducer時候作為第一個構造引數傳入(見上面的註釋)
            targetTopic = defaultTopicId;
        }

        Long timestamp = null;
        if (this.writeTimestampToKafka) {
            timestamp = context.timestamp();
        }
        ProducerRecord<byte[], byte[]> record;
        int[] partitions = topicPartitionsMap.get(targetTopic);
        if (null == partitions) {
            partitions = getPartitionsByTopic(targetTopic, transaction.producer);
            topicPartitionsMap.put(targetTopic, partitions);
        }
        if (flinkKafkaPartitioner != null) {
            record = new ProducerRecord<>(
                targetTopic, // 這裡看到了我們上面一開始分析的ProducerRecord
                flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
                timestamp,
                serializedKey,
                serializedValue);
        } else {
            record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
        }
        pendingRecords.incrementAndGet();
        transaction.producer.send(record, callback);
    }