spring整合kfka實現訊息佇列
阿新 • • 發佈:2019-01-03
1.搭建kafka執行環境 ,下載地址:http://kafka.apache.org/downloads
- 下載完修改安裝檔案config裡面的server.properties檔案,將listeners=PLAINTEXT://IP:9092改成伺服器的IP,修改zookeeper註冊地址
- .\bin\windows\kafka-server-start.bat .\config\server.properties使用該命令啟動kafka
- 測試一下是否可以傳送訊息
bin\windows\kafka-console-producer.bat –broker-list IP9:092 –topic pipi
bin\windows\kafka-console-consumer.bat –bootstrap-server IP:9092 –topic
pipi –from- beginning
2.搭建生產者環境
- 配置producer.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" >
<!-- 定義producer屬性-->
<bean id="kafakaproducer" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="10.168.114.91:9092"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<!--配置kafka的工廠 -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="kafakaproducer"/>
</constructor-arg>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<ref bean="producerFactory"/>
</constructor-arg>
<property name="defaultTopic" value="mhb-test"/>
</bean>
</beans>
- 測試類
public class SpringKafkaTest {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:config/spring-kafka.xml");
KafkaTemplate kafkaTemplate = ctx.getBean("kafkaTemplate", KafkaTemplate.class);
for(int i=0;i<1000000000;i++){
String message="第"+i+"條資訊";
kafkaTemplate.send("pipi",message);
System.out.println("message="+message);
}
}
}
3 . 搭建消費者環境
- 配置consumer.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定義消費者屬性 -->
<bean id="kafkaConsumerProperites" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="10.168.114.91:9092" />
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!--配置kafka的工廠 -->
<bean id="kafkaConsumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="kafkaConsumerProperites" />
</constructor-arg>
</bean>
<!--消費者具體實體類-->
<bean id="KafkaConsumerListener" class="service.KafkaConsumerListener" />
<!--配置容器屬性-->
<bean id="consumerContainerProperties"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="pipi" />
<property name="messageListener" ref="KafkaConsumerListener" />
</bean>
<!--配置消費者容器-->
<bean id="conusmerContainer"class="org.springframework.kafka.listener.KafkaMessageListenerContainer " init-method="doStart">
<constructor-arg ref="kafkaConsumerFactory" />
<constructor-arg ref="consumerContainerProperties" />
</bean>
</beans>
- 配置監聽器
@Component("kafkaConsumerListener")
public class KafkaConsumerListener implements MessageListener<String, String>{
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.printf("接收到訊息="+ record.value());
}
}
- 消費者是web專案,啟動後監聽器就開始工作了,然後跑生產者的測試類就可以在消費者中控制檯中看到日誌了、
一個kafka消費者的管理工具kafkamonitor 在介面中就能看到訊息模式和消費者
- kafkamonitor是一個jar,只需把它放在kafka安裝目錄windows下,編寫一個指令碼並執行即可啟動專案
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.168.114.91:2181 --port 8089 --refresh 10.seconds --retain 1.days