1. 程式人生 > >jstorm集成kafka

jstorm集成kafka

spark 方式 emit lis tor nco XML 自己 face

本人是spark的擁躉,因為工作中需要用到jstorm,作記錄如下。

pom.xml

技術分享
<dependencies>

        <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-core</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-nop</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-jdk14</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>jcl-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.6.3</version>
        </dependency>

        <dependency>
            <groupId>javax.mail</groupId>
            <artifactId>mail</artifactId>
            <version>1.4.7</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>utf8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
View Code

沒什麽好說的,無非都是常規的東西。需要註意的是做好kafka的offset的維護。

其它需要註意的兩點,異常消息的處理和限流。

異常消息的處理其實就是ack/fail的問題。使用BaseBasicBolt的話,它會自動幫你實現ack與fail。但需要手動拋出FailException。這樣的話,一旦出現異常,整個topology就退出集群了,這是不可接受的。

無奈只有使用IRichBolt,手動去捕獲異常。這樣如果異常不是數據結構的問題,只是下遊比如獲取其它數據連接問題比如郵件服務器問題,那麽失敗了不去更新offset,下次啟動的時候還能繼續消費。也無需去手動重發。

如果需要重發的話,storm只能自己實現,jstorm可以通過以下方式:

public interface IFailValueSpout { void fail(Object msgId, List<object>values); }

想要實現ack/fail必須滿足以下三點:

1. 在spout emit tuple的時候,要加上第3個參數messageid
2. 在配置中acker數目至少為1
3. 在bolt emit的時候,要加上第二個參數anchor tuple,以保持tracker鏈路

第1點,由於使用的是KafkaSpout,已經實現了。

第2點:

config.setNumAckers(1);

第3點:

collector.emit(input,new Values(map));

限流的問題,主要是考慮到可能會有的這麽一種場景。如果jstorm集群意外退出,或者升級出現情況,導致長時間無法重啟。而這時候kafka集群生產端消息源源不斷在產生新的消息。當重啟jstorm集群的時候,勢必會導致消息大量湧入jstorm集群。

還有一種場景就是,消息量不穩定,時大時小,那麽非常有必須設置這個參數進行限流。

那麽這時候需要對消息進行限流。在spark streaming中可以對kafka每個分區每秒的消息數進行限制;考慮到如果直接寫死一個值,在低谷期間會造成資源的浪費,可以通過資源實現情況來限流。

而jstorm則通過topology.max.spout.pending來設置。它表示jstorm集群中可能緩存也就是待消費的消息數。如果大於這個數,新的消息就不會進來。

jstorm集成kafka