1. 程式人生 > 其它 >spring整合kafka簡單單元測試

spring整合kafka簡單單元測試

kafka簡介: 

Kafka 是一種分散式的,基於釋出 / 訂閱的訊息系統。主要設計目標如下:

  • 以時間複雜度為 O(1) 的方式提供訊息持久化能力,即使對 TB 級以上資料也能保證常數時間複雜度的訪問效能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒 100K 條以上訊息的傳輸。
  • 支援 Kafka Server 間的訊息分割槽,及分散式消費,同時保證每個 Partition 內的訊息順序傳輸。
  • 同時支援離線資料處理和實時資料處理。
  • Scale out:支援線上水平擴充套件。

一:配置檔案

  1.1:pom.xml中加入kafka依賴

<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2
.4.RELEASE</version> </dependency>

  1.2:匯入生產者的xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context
="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!--引數配置 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!-- kafka服務地址,可能是叢集 value="localhost:9092,localhost:9093,localhost:9094"--> <entry key="bootstrap.servers" value="ip地址:9092" /> <!--有可能導致broker接收到重複的訊息 預設是0--> <entry key="retries" value="3" /> <!--acks=0, 表示生產者在成功寫入訊息之前不會等待任何來自伺服器的響應--> <!--acks=1, 表示只要叢集的leader分割槽副本接收到了訊息,就會向生產者傳送一個成功響應的ack--> <!--acks=all, 表示只要所有參與複製的1節點(ISR列表的副本)全部收到訊息時,生產者才會接收到來自伺服器的響應--> <entry key="ack" value="1" /> <!--producer可以用來快取資料的記憶體大小,如果資料產生速度大於向broker傳送的速度,producer會阻塞或者丟擲異常--> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 建立kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 --> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <!--設定對應topic test -是後臺已經建立好的主題--> <property name="defaultTopic" value="test" /> </bean> </beans>

  

  1.3:匯入消費者xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!--訊息監聽器-->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>

    <!-- 記得修改主題 -->
    <bean id="containerProperties"
          class="org.springframework.kafka.listener.ContainerProperties">
        <!-- 建構函式 就是 主題的引數值 -->
        <constructor-arg value="test" />
        <!-- 自定義個訊息監聽器 -->
        <property name="messageListener" ref="myListnener" />
        <!--手工確定-->
        <property name="ackMode" value="MANUAL"></property>
    </bean>

    <!-- -訊息監聽器 -->
    <bean id="myListnener" class="com.zcb.kafka.MyMessageListener"></bean>

    <!-- 建立consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服務地址 -->
                <entry key="bootstrap.servers" value="ip地址:9092" />
                <!--Consumer的組ID,相同group.id的consumer屬於同一個組。 -->
                <entry key="group.id" value="test-consumer-group" />
                <!--如果此值設定為true,consumer會週期性的把當前消費的offset值儲存到zookeeper。當consumer失敗重啟之後將會使用此值作為新開始消費的值。 -->
                <entry key="enable.auto.commit" value="false" />
                <!--當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費-->
                <entry key="auto-offset-reset" value="earliest"></entry>
                <!--網路請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
                <entry key="session.timeout.ms" value="15000" />

                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />

                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>


</beans>

  

  1.4:新增監聽器實體類,以及用於啟動消費者的實體類

//消費者監聽器
public class MyMessageListener implements AcknowledgingMessageListener<String,String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        String key = data.key();
        String value = data.value();
        if(key.equals("request")){
            String s = JSON.parseObject(value, String.class);
            System.out.println(s);
            //確定接收
            acknowledgment.acknowledge();
        }
    }
}
//啟動消費者
public class KafKaMain {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("spring-kafka-consumer.xml");
    }
}

  二:啟動生產者消費者

    2.1:測試類

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-kafka-producer.xml")
public class MyTest {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Test
    public void testKafka(){
        kafkaTemplate.sendDefault("request", JSON.toJSONString("hello,Word!"));
    }

}

    

    2.2:效果

我有一杯酒,足以慰風塵。