1. 程式人生 > >spring整合kfka實現訊息佇列

spring整合kfka實現訊息佇列

1.搭建kafka執行環境 ,下載地址:http://kafka.apache.org/downloads

  • 下載完修改安裝檔案config裡面的server.properties檔案,將listeners=PLAINTEXT://IP:9092改成伺服器的IP,修改zookeeper註冊地址
  • .\bin\windows\kafka-server-start.bat .\config\server.properties使用該命令啟動kafka
  • 測試一下是否可以傳送訊息
    bin\windows\kafka-console-producer.bat –broker-list IP9:092 –topic pipi
    bin\windows\kafka-console-consumer.bat –bootstrap-server IP:9092 –topic
    pipi –from- beginning

2.搭建生產者環境

  • 配置producer.xml檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
>
<!-- 定義producer屬性--> <bean id="kafakaproducer" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="10.168.114.91:9092"/> <entry key="group.id" value="0"/> <entry key="retries" value="10"/> <entry
key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean> <!--配置kafka的工廠 --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="kafakaproducer"/> </constructor-arg> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg> <ref bean="producerFactory"/> </constructor-arg> <property name="defaultTopic" value="mhb-test"/> </bean> </beans>
  • 測試類
public class SpringKafkaTest {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:config/spring-kafka.xml");
        KafkaTemplate kafkaTemplate = ctx.getBean("kafkaTemplate", KafkaTemplate.class);
        for(int i=0;i<1000000000;i++){
            String message="第"+i+"條資訊";
            kafkaTemplate.send("pipi",message);
            System.out.println("message="+message);
        }

    }
}

3 . 搭建消費者環境

  • 配置consumer.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 定義消費者屬性 -->
    <bean id="kafkaConsumerProperites" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="10.168.114.91:9092" />
                <entry key="group.id" value="0"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--配置kafka的工廠 -->
    <bean id="kafkaConsumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="kafkaConsumerProperites" />
        </constructor-arg>
    </bean>
<!--消費者具體實體類-->
    <bean id="KafkaConsumerListener" class="service.KafkaConsumerListener" />
    <!--配置容器屬性-->
    <bean id="consumerContainerProperties"
        class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="pipi" />
        <property name="messageListener" ref="KafkaConsumerListener" />
    </bean>
<!--配置消費者容器-->
    <bean id="conusmerContainer"class="org.springframework.kafka.listener.KafkaMessageListenerContainer " init-method="doStart">
        <constructor-arg ref="kafkaConsumerFactory" />
        <constructor-arg ref="consumerContainerProperties" />
    </bean>
</beans>
  • 配置監聽器
@Component("kafkaConsumerListener")
public class KafkaConsumerListener implements MessageListener<String, String>{
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.printf("接收到訊息="+ record.value());        
    }
}
  • 消費者是web專案,啟動後監聽器就開始工作了,然後跑生產者的測試類就可以在消費者中控制檯中看到日誌了、

一個kafka消費者的管理工具kafkamonitor 在介面中就能看到訊息模式和消費者

  • kafkamonitor是一個jar,只需把它放在kafka安裝目錄windows下,編寫一個指令碼並執行即可啟動專案
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.168.114.91:2181 --port 8089 --refresh 10.seconds --retain 1.days