jstorm集成kafka
本人是spark的擁躉,因為工作中需要用到jstorm,作記錄如下。
pom.xml
<dependencies> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>utf8</encoding> </configuration> </plugin> </plugins> </build>
沒什麽好說的,無非都是常規的東西。需要註意的是做好kafka的offset的維護。
其它需要註意的兩點,異常消息的處理和限流。
異常消息的處理其實就是ack/fail的問題。使用BaseBasicBolt的話,它會自動幫你實現ack與fail。但需要手動拋出FailException。這樣的話,一旦出現異常,整個topology就退出集群了,這是不可接受的。
無奈只有使用IRichBolt,手動去捕獲異常。這樣如果異常不是數據結構的問題,只是下遊比如獲取其它數據連接問題比如郵件服務器問題,那麽失敗了不去更新offset,下次啟動的時候還能繼續消費。也無需去手動重發。
如果需要重發的話,storm只能自己實現,jstorm可以通過以下方式:
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
想要實現ack/fail必須滿足以下三點:
1. 在spout emit tuple的時候,要加上第3個參數messageid
2. 在配置中acker數目至少為1
3. 在bolt emit的時候,要加上第二個參數anchor tuple,以保持tracker鏈路
第1點,由於使用的是KafkaSpout,已經實現了。
第2點:
config.setNumAckers(1);
第3點:
collector.emit(input,new Values(map));
限流的問題,主要是考慮到可能會有的這麽一種場景。如果jstorm集群意外退出,或者升級出現情況,導致長時間無法重啟。而這時候kafka集群生產端消息源源不斷在產生新的消息。當重啟jstorm集群的時候,勢必會導致消息大量湧入jstorm集群。
還有一種場景就是,消息量不穩定,時大時小,那麽非常有必須設置這個參數進行限流。
那麽這時候需要對消息進行限流。在spark streaming中可以對kafka每個分區每秒的消息數進行限制;考慮到如果直接寫死一個值,在低谷期間會造成資源的浪費,可以通過資源實現情況來限流。
而jstorm則通過topology.max.spout.pending來設置。它表示jstorm集群中可能緩存也就是待消費的消息數。如果大於這個數,新的消息就不會進來。
jstorm集成kafka