1. 程式人生 > >漫遊Kafka實戰篇之客戶端程式設計例項

漫遊Kafka實戰篇之客戶端程式設計例項

Kafka Producer APIs

新版的Producer API提供了以下功能:
  1. 可以將多個訊息快取到本地佇列裡,然後非同步的批量傳送到broker,可以通過引數producer.type=async做到。快取的大小可以通過一些引數指定:queue.timebatch.size。一個後臺執行緒((kafka.producer.async.ProducerSendThread)從佇列中取出資料並讓kafka.producer.EventHandler將訊息傳送到broker,也可以通過引數event.handler定製handler,在producer端處理資料的不同的階段註冊處理器,比如可以對這一過程進行日誌追蹤,或進行一些監控。只需實現kafka.producer.async.CallbackHandler
    介面,並在callback.handler中配置。
  2. 自己編寫Encoder來序列化訊息,只需實現下面這個介面。預設的Encoder是kafka.serializer.DefaultEncoder
    interface Encoder<T> {
      public Message toMessage(T data);
    }
  3. 提供了基於Zookeeper的broker自動感知能力,可以通過引數zk.connect實現。如果不使用Zookeeper,也可以使用broker.list引數指定一個靜態的brokers列表,這樣訊息將被隨機的傳送到一個broker上,一旦選中的broker失敗了,訊息傳送也就失敗了。
  4. 通過分割槽函式kafka.producer.Partitioner類對訊息分割槽
    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }
    分割槽函式有兩個引數:key和可用的分割槽數量,從分割槽列表中選擇一個分割槽並返回id。預設的分割槽策略是hash(key)%numPartitions.如果key是null,就隨機的選擇一個。可以通過引數partitioner.class定製分割槽函式。

新的api完整例項如下:

package com.cuicui.kafkademon;


import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


/**
 * @author <a href="mailto:
[email protected]
">崔磊</a>  * @date 2015年11月4日 上午11:44:15  */ public class MyProducer {     public static void main(String[] args) throws InterruptedException {         Properties props = new Properties();         props.put("serializer.class", "kafka.serializer.StringEncoder");         props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);         props.put("partitioner.class", "com.cuicui.kafkademon.MyPartitioner");         props.put("request.required.acks", "1");         ProducerConfig config = new ProducerConfig(props);         Producer<String, String> producer = new Producer<String, String>(config);         // 單個傳送         for (int i = 0; i <= 1000000; i++) {             KeyedMessage<String, String> message =                     new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);             producer.send(message);             Thread.sleep(5000);         }         // 批量傳送         List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);         for (int i = 0; i <= 10000; i++) {             KeyedMessage<String, String> message =                     new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);             messages.add(message);             if (i % 100 == 0) {                 producer.send(messages);                 messages.clear();             }         }         producer.send(messages);     } }

下面這個是用到的分割槽函式:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;


public class MyPartitioner implements Partitioner {
    public MyPartitioner(VerifiableProperties props) {


    }


    /*
     * @see kafka.producer.Partitioner#partition(java.lang.Object, int)
     */
    @Override
    public int partition(Object key, int partitionCount) {
        return Integer.valueOf((String) key) % partitionCount;
    }
}


KafKa Consumer APIs

Consumer API有兩個級別。低級別的和一個指定的broker保持連線,並在接收完訊息後關閉連線,這個級別是無狀態的,每次讀取訊息都帶著offset。

高級別的API隱藏了和brokers連線的細節,在不必關心服務端架構的情況下和服務端通訊。還可以自己維護消費狀態,並可以通過一些條件指定訂閱特定的topic,比如白名單黑名單或者正則表示式。

低級別的API

package com.cuicui.kafkademon;


import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;


/**
 * offset自己維護 目標topic、partition均由自己分配
 * 
 * @author <a href="mailto:[email protected]">崔磊</a>
 * @date 2015年11月4日 上午11:44:15
 *
 */
public class MySimpleConsumer {


    public static void main(String[] args) {
        new MySimpleConsumer().consume();
    }


    /**
     * 消費訊息
     */
    public void consume() {
        int partition = 0;


        // 找到leader
        Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);


        // 從leader消費
        SimpleConsumer simpleConsumer =
                new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");
        long startOffet = 1;
        int fetchSize = 1000;


        while (true) {
            long offset = startOffet;
            // 新增fetch指定目標tipic,分割槽,起始offset及fetchSize(位元組),可以新增多個fetch
            FetchRequest req =
                    new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();


            // 拉取訊息
            FetchResponse fetchResponse = simpleConsumer.fetch(req);


            ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);
            for (MessageAndOffset messageAndOffset : messageSet) {
                Message mess = messageAndOffset.message();
                ByteBuffer payload = mess.payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                String msg = new String(bytes);


                offset = messageAndOffset.offset();
                System.out.println("partition : " + 3 + ", offset : " + offset + "  mess : " + msg);
            }
            // 繼續消費下一批
            startOffet = offset + 1;
        }
    }


    /**
     * 找到制定分割槽的leader broker
     * 
     * @param brokerHosts broker地址,格式為:“host1:port1,host2:port2,host3:port3”
     * @param topic topic
     * @param partition 分割槽
     * @return
     */
    public Broker findLeader(String brokerHosts, String topic, int partition) {
        Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();
        System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),
                leader.port()));
        return leader;
    }


    /**
     * 找到指定分割槽的元資料
     * 
     * @param brokerHosts broker地址,格式為:“host1:port1,host2:port2,host3:port3”
     * @param topic topic
     * @param partition 分割槽
     * @return 元資料
     */
    private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {
        PartitionMetadata returnMetaData = null;
        for (String brokerHost : brokerHosts.split(",")) {
            SimpleConsumer consumer = null;
            String[] splits = brokerHost.split(":");
            consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");
            List<String> topics = Collections.singletonList(topic);
            TopicMetadataRequest request = new TopicMetadataRequest(topics);
            TopicMetadataResponse response = consumer.send(request);
            List<TopicMetadata> topicMetadatas = response.topicsMetadata();
            for (TopicMetadata topicMetadata : topicMetadatas) {
                for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {
                    if (PartitionMetadata.partitionId() == partition) {
                        returnMetaData = PartitionMetadata;
                    }
                }
            }
            if (consumer != null)
                consumer.close();
        }
        return returnMetaData;
    }


    /**
     * 根據時間戳找到某個客戶端消費的offset
     * 
     * @param consumer SimpleConsumer
     * @param topic topic
     * @param partition 分割槽
     * @param clientID 客戶端的ID
     * @param whichTime 時間戳
     * @return offset
     */
    public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
}
低級別的API是高級別API實現的基礎,也是為了一些對維持消費狀態有特殊需求的場景,比如Hadoop consumer這樣的離線consumer。

高級別的API

package com.cuicui.kafkademon;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;


/**
 * offset在zookeeper中記錄,以group.id為key 分割槽和customer的對應關係由Kafka維護
 * 
 * @author <a href="mailto:[email protected]">崔磊</a>
 * @date 2015年11月4日 上午11:44:15
 */
public class MyHighLevelConsumer {

    /**
     * 該consumer所屬的組ID
     */
    private String groupid;

    /**
     * 該consumer的ID
     */
    private String consumerid;

    /**
     * 每個topic開幾個執行緒?
     */
    private int threadPerTopic;

    public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {
        super();
        this.groupid = groupid;
        this.consumerid = consumerid;
        this.threadPerTopic = threadPerTopic;
    }

    public void consume() {
        Properties props = new Properties();
        props.put("group.id", groupid);
        props.put("consumer.id", consumerid);
        props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "2000");
        // props.put("auto.commit.interval.ms", "1000");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        // 設定每個topic開幾個執行緒
        topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);

        // 獲取stream
        Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);

        // 為每個stream啟動一個執行緒消費訊息
        for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {
            new MyStreamThread(stream).start();
        }
    }

    /**
     * 每個consumer的內部執行緒
     * 
     * @author cuilei05
     *
     */
    private class MyStreamThread extends Thread {
        private KafkaStream<byte[], byte[]> stream;

        public MyStreamThread(KafkaStream<byte[], byte[]> stream) {
            super();
            this.stream = stream;
        }

        @Override
        public void run() {
            ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

            // 逐條處理訊息
            while (streamIterator.hasNext()) {
                MessageAndMetadata<byte[], byte[]> message = streamIterator.next();
                String topic = message.topic();
                int partition = message.partition();
                long offset = message.offset();
                String key = new String(message.key());
                String msg = new String(message.message());
                // 在這裡處理訊息,這裡僅簡單的輸出
                // 如果訊息消費失敗,可以將已上資訊列印到日誌中,活著傳送到報警簡訊和郵件中,以便後續處理
                System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()
                        + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "
                        + key + " , mess : " + msg);
            }
        }
    }

    public static void main(String[] args) {
        String groupid = "myconsumergroup";
        MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1", 3);
        MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2", 3);

        consumer1.consume();
        consumer2.consume();
    }
}
這個API圍繞著由KafkaStream實現的迭代器展開,每個流代表一系列從一個或多個分割槽多和broker上匯聚來的訊息,每個流由一個執行緒處理,所以客戶端可以在建立的時候通過引數指定想要幾個流。一個流是多個分割槽多個broker的合併,但是每個分割槽的訊息只會流向一個流。

每呼叫一次createMessageStreams都會將consumer註冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整。API鼓勵每次呼叫建立更多的topic流以減少這種調整。createMessageStreamsByFilter方法註冊監聽可以感知新的符合filter的tipic。

相關推薦

漫遊Kafka實戰客戶程式設計例項

Kafka Producer APIs 新版的Producer API提供了以下功能:可以將多個訊息快取到本地佇列裡,然後非同步的批量傳送到broker,可以通過引數producer.type=async做到。快取的大小可以通過一些引數指定:queue.time和

漫遊Kafka實戰客戶API

Kafka Producer APIs 舊版的Procuder API有兩種:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它們都實現了同一個介面: class Producer 

漫遊Kafka入門簡單介紹

釋出訊息通常有兩種模式:佇列模式(queuing)和釋出-訂閱模式(publish-subscribe)。佇列模式中,consumers可以同時從服務端讀取訊息,每個訊息只被其中一個consumer讀到;釋出-訂閱模式中訊息被廣播到所有的consumer中。 Consumers可以加入一個consumer

漫遊Kafka實現訊息和日誌

訊息格式 日誌 一個叫做“my_topic”且有兩個分割槽的的topic,它的日誌有兩個資料夾組成,my_topic_0和my_topic_1,每個資料夾裡放著具體的資料檔案,每個資料檔案都是一系列的日誌實體,每個日誌實體有一個4個位元組的整數N標註訊息的長度,後邊

linux tcp多執行緒伺服器與客戶程式設計例項

伺服器端: #include<iostream> #include<arpa/inet.h> #include<sys/socket.h> #include<cstdlib> #include<cstdio> #i

JavaScript高階程式設計客戶檢測

瀏覽器檢測,是個很讓人頭痛但又必須要考慮的一個問題。畢竟市面上的瀏覽器廠商眾多,雖然有些標準化的東西約束著他們,但是有些東西,他們還是各自按自己的意願來。這其中以IE瀏覽器最為突出,它幾乎可以被稱之為萬惡之源。 瀏覽器檢測的方法有很多,這裡介紹的可能並不是全部,但基本上都是比較常用的方法。下邊我就來詳細介紹

Java網路程式設計7.TCP網路程式設計客戶鍵盤錄入伺服器控制檯輸出

TCP網路程式設計之客戶端鍵盤錄入伺服器控制檯輸出 1、鍵盤錄入資料 BufferedReader br = new BufferedReader(new InputStreamReader(S

Asp.net Ajax客戶程式設計頁面生命週期和請求生命週期

  Asp.net Ajax客戶端程式設計之頁面生命週期和請求生命週期     Asp.net Ajax框架為客戶端也賦上了頁面的生命週期,類似於伺服器段的頁面事件:PreInit,Init,CompleteInit,PreLoad,Load,CompleteLoad,Pr

ZK客戶程式設計Checkbox複選框的(全選/全不選)

on June 30, 2010, I have met a question In ZK client programming, 使用 ZK的方便之處就是基於注  解"@{}",但所有的事物都是雙刃劍,當我們使用註解實現AfterCompse介面 zk自動幫我們繫結後臺的物

建立資料庫並實現TCP通訊,客戶登陸伺服器身份驗證服務程式設計

m_pRecordsetB->Open((_variant_t)flsql,dlg->m_pConnectionB.GetInterfacePtr(), adOpenDynamic, adLockOptimistic, adCmdText);//開啟資料庫獲得符合條件              

C# Socket網路程式設計客戶群發訊息

接上一片部落格,接著socket的學習。此次寫的是伺服器端被多個客戶端連線,並且一個客戶端發訊息,其他連線的客戶端都可接收到。 伺服器端設計思路: 1.要有一個執行緒監聽埠,當有客戶端連線上時,就要產生一個socket物件負責和這個客戶端通訊,此時需要開啟一個執行緒處理與這

netty框架學習初始---多客戶的實現補充部分

上一篇文章中並沒有太過詳細的講解,而且經過今天一天的瞎搞,弄清了幾個問題,於是在這裡先補充一下,也有幾個地方對前面的文章做一下修正。 1.關於HelloServerInitializer(後面我改成了ServerInitalizer,畢竟專案不能叫做hello什麼吧。。。)

完成埠(CompletionPort)客戶

** 完成埠之客戶端篇 ** 首先說一下這篇文章的初衷。不久前工作中要用到網路通訊進行資料交換,既然要通訊當然要有伺服器和客戶端,於是乎把MFC中的CAsyncSocket搬過來用了,簡單的過載幾個函式就完成了資料收發,但是後續遇到了較多問題,

WCF系列教程客戶異步調用服務

1.5 void 添加引用 dsl idt pan important 配置 但是 本文參考自http://www.cnblogs.com/wangweimutou/p/4409227.html,純屬讀書筆記,加深記憶 一、簡介 在前面的隨筆中,詳細的介紹了WCF客戶端服務

Netty入門客戶與服務通信(二)

ktr 數據格式 lis boot ride owa 參數 val cef Netty入門之客戶端與服務端通信(二) 一.簡介   在上一篇博文中筆者寫了關於Netty入門級的Hello World程序。書接上回,本博文是關於客戶端與服務端的通信,感覺也沒什麽好說的了,直接

Spring Cloud客戶負載平衡器:Ribbon

highlight 情況下 upd block poll sla conf project 遠程服務 Ribbon是一個客戶端負載均衡器,它可以很好地控制HTTP和TCP客戶端的行為。Feign已經使用Ribbon,所以如果您使用@FeignClient,則本節也適用。

跟我學習Spring Cloud客戶負載平衡器:Ribbon

電子商務 springcloud spring cloud springcloud微服務 微服務雲架構 Ribbon是一個客戶端負載均衡器,它可以很好地控制HTTP和TCP客戶端的行為。Feign已經使用Ribbon,所以如果您使用@FeignClient,則本節也適用。Ribbon中的中

02_HTML5+CSS3詳解第五天(實戰HTML5制作企業網站)

shu 文檔 href baidu mindjet mmap .mm 如何 鏈接 Details 一、Xmind部分 xmind教程:http://www.jianshu.com/p/7c488d5e4bdf xmind安裝破解(百度網盤鏈接:https://pan.ba

java在線聊天項目0.9版 實現把服務接收到的信息返回給每一個客戶窗口中顯示功能客戶接收

nec 一個 out for tex ava 添加 implement com 客戶端要不斷接收服務端發來的信息 與服務端不斷接收客戶端發來信息相同,使用線程的方法,在線程中循環接收 客戶端修改後代碼如下: package com.swift; import java.

zookeeper源碼客戶

服務端 run t對象 成對 bool .com 操作 code 分享   zookeeper自身提供了一個簡易的客戶端。主要包括一下幾個模塊:   1.啟動模塊。   2.核心執行模塊。   3.網絡通信模塊。 啟動模塊   啟動程序,接收和解析命令行。詳見zookeep