1. 程式人生 > >kafkaspot在ack機制下如何保證記憶體不溢

kafkaspot在ack機制下如何保證記憶體不溢

http://www.cnblogs.com/intsmaze/p/5947078.html
storm框架中的kafkaspout類實現的是BaseRichSpout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新發送;如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重複傳送。 但是回想一下我們自己寫一個spout類實現BaseRichSpout並讓他具備訊息重發,那麼我們是會在我們的spout類裡面定義一個map集合,並以msgId作為key。 複製程式碼
public class MySpout extends BaseRichSpout {
    private static
final long serialVersionUID = 5028304756439810609L; // key:messageId,Data private HashMap<String, String> waitAck = new HashMap<String, String>(); private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new
Fields("sentence")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { String sentence = "the cow jumped over the moon"; String messageId = UUID.randomUUID().toString().replaceAll("-", ""); waitAck.put(messageId, sentence); //
指定messageId,開啟ackfail機制 collector.emit(new Values(sentence), messageId); } @Override public void ack(Object msgId) { System.out.println("訊息處理成功:" + msgId); System.out.println("刪除快取中的資料..."); waitAck.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("訊息處理失敗:" + msgId); System.out.println("重新發送失敗的資訊..."); //重發如果不開啟ackfail機制,那麼spout的map物件中的該資料不會被刪除的,而且下游 collector.emit(new Values(waitAck.get(msgId)),msgId); } }
複製程式碼 那麼kafkaspout會不會也是這樣還儲存這已傳送未收到bolt響應的訊息呢?如果這樣,如果訊息處理不斷失敗,不斷重發,訊息不斷積累在kafkaspout節點上,kafkaspout端會不就會出現記憶體溢位? 其實並沒有,回想kafka的原理,Kafka會為每一個consumergroup保留一些metadata資訊–當前消費的訊息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條訊息後線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些訊息。也就是說,kafkaspot在消費kafka的資料是,通過offset讀取到訊息併發送給bolt後,kafkaspot只是儲存者當前的offset值。 當失敗或成功根據msgId查詢offset值,然後再去kafka消費該資料來確保訊息的重新發送。 那麼雖然offset資料小,但是當offset的資料量上去了還是會記憶體溢位的? 其實並沒有,kafkaspout發現快取的資料超過限制了,會把某端的資料清理掉的。 kafkaspot中傳送資料的程式碼
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID裡面包裝了offset引數。 它不快取已經發送出去的資料資訊。 當他接收到來至bolt的響應後,會從接收到的msgId中得到offset。以下是從原始碼中折取的關鍵程式碼: 複製程式碼
public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset);
 public void ack(Long offset) {
     _pending.remove(offset);//處理成功移除offset
     numberAcked++;
 }



 public void fail(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
         m.fail(id.offset);
      }
  }
  m.fail(id.offset);
  public void fail(Long offset) {
     failed.add(offset);//處理失敗新增offset
        numberFailed++;
   }
    
    SortedSet<Long> _pending = new TreeSet<Long>();
    SortedSet<Long> failed = new TreeSet<Long>();
複製程式碼

關於kafkaspot的原始碼解析大家可以看這邊部落格:http://www.cnblogs.com/cruze/p/4241181.html

原始碼解析中涉及了很多kafka的概念,所以僅僅理解kafka的概念想完全理解kafkaspot原始碼是很難的,如果不理解kafka概念,那麼就只需要在理解storm的ack機制上明白kafkaspot做了上面的兩件事就可以了。