Kafka+SpringMVC+Maven應用示例--轉載 Kafka+SpringMVC+Maven應用示例
Kafka+SpringMVC+Maven應用示例
本文藉助主流SpringMVC框架向大家介紹如何在具體應用中簡單快捷的使用kafka。kafka、maven以及SpringMVC在現在的企業級應用中都佔據著非常重要的地位,所以本文將三者結合起來也可以方便大家進一步熟悉基於Maven的SpringMVC框架搭建。
專案展示
國際慣例,首先先向大家展示一下專案最終的執行效果:
當專案正常啟動後,在瀏覽器中輸入:http://127.0.0.1:8080/kafkaSpringMVC/welcome 進入歡迎介面:
然後點選Send a Message 進入訊息傳送頁面:
從上面可以看出,傳送的訊息是當前系統的時間(當然你也可以修改成為自己感冒的訊息),點選Submit後將訊息傳送到kafka叢集伺服器,然後自動返回到Welcome歡迎介面。在歡迎介面點選Get a Message:
從上述介面中我們可以看見頁面中已經獲取到了剛才傳送的訊息,點選RETURN HOME,返回歡迎介面,好啦專案展示就這麼簡單。
開發環境
- 作業系統:MacOS 10.12.3(同樣適用於Linux系統和Windows系統)
- JDK: java version "1.8.0_121"
- 開發平臺:Eclipse Neon.2 Release (4.6.2)
- WEB容器:wildfly-8.1.0.Final
- zookeeper: zookeeper-3.4.9
- kafka: kafka-2.10-0.10.2.0
- maven: Eclipse Neon.2 Release(4.6.2)自帶maven工具,版本為3.3.9
專案框架
專案框架如下圖所示:
專案開發流程
首先搭建Maven Web Project框架,搭建過程可參考我的另一篇隨筆
框架搭建完成後,下面就需要引入Spring MVC 所需要的jar包和kafka客戶端開發需要的jar包。本示例採用的Spring 版本為4.3.6.REALEASE,使用的kafka客戶端版本為0.10.2.0。下面通過修改pom.xml檔案來引入外部依賴包:
pom.xml
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 <groupId>com.unionpay</groupId> 5 <artifactId>kafkaSpringMVC</artifactId> 6 <packaging>war</packaging> 7 <version>0.0.1-SNAPSHOT</version> 8 <name>kafkaSpringMVC Maven Webapp</name> 9 <url>http://maven.apache.org</url> 10 11 <properties> 12 <springframework>4.3.6.RELEASE</springframework> 13 </properties> 14 15 <dependencies> 16 <dependency> 17 <groupId>junit</groupId> 18 <artifactId>junit</artifactId> 19 <version>3.8.1</version> 20 <scope>test</scope> 21 </dependency> 22 23 <dependency> 24 <groupId>org.springframework</groupId> 25 <artifactId>spring-core</artifactId> 26 <version>${springframework}</version> 27 </dependency> 28 29 <dependency> 30 <groupId>org.springframework</groupId> 31 <artifactId>spring-context</artifactId> 32 <version>${springframework}</version> 33 </dependency> 34 35 <dependency> 36 <groupId>org.springframework</groupId> 37 <artifactId>spring-tx</artifactId> 38 <version>${springframework}</version> 39 </dependency> 40 41 <dependency> 42 <groupId>org.springframework</groupId> 43 <artifactId>spring-webmvc</artifactId> 44 <version>${springframework}</version> 45 </dependency> 46 47 <dependency> 48 <groupId>org.springframework</groupId> 49 <artifactId>spring-web</artifactId> 50 <version>${springframework}</version> 51 </dependency> 52 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-jms</artifactId> 56 <version>${springframework}</version> 57 </dependency> 58 59 <dependency> 60 <groupId>org.apache.kafka</groupId> 61 <artifactId>kafka-clients</artifactId> 62 <version>0.10.2.0</version> 63 </dependency> 64 65 </dependencies> 66 <build> 67 <finalName>kafkaSpringMVC</finalName> 68 </build> 69 </project>
寫完pom.xml檔案後,儲存。然後右鍵專案名稱,選擇Maven->Update Project... 更新專案,引入jar包。
Update Project完成後,可以在maven依賴包裡看見剛才引入的本專案需要的jar包:
接下來編寫web.xml檔案:
web.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns="http://java.sun.com/xml/ns/javaee" 4 xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" 5 id="WebApp_ID" version="3.0"> 6 7 <context-param> 8 <param-name>contextConfigLocation</param-name> 9 <param-value>classpath:kafka-beans.xml</param-value> 10 </context-param> 11 12 <listener> 13 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> 14 </listener> 15 16 <servlet> 17 <servlet-name>springDispatcherServlet</servlet-name> 18 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> 19 <init-param> 20 <param-name>contextConfigLocation</param-name> 21 <param-value>classpath:spring-mvc-dispatcher.xml</param-value> 22 </init-param> 23 <load-on-startup>1</load-on-startup> 24 </servlet> 25 26 <servlet-mapping> 27 <servlet-name>springDispatcherServlet</servlet-name> 28 <url-pattern>/</url-pattern> 29 </servlet-mapping> 30 31 </web-app>
簡單明瞭,從web.xml描述來看,MVC對映是通過spring-mvc-dispatcher.xml檔案說明的,而專案中用到的所有的model 則是通過kafka-beans.xml檔案注入的。那麼下面分別在resources資料夾下建立這兩個xml配置檔案:
spring-mvc-dispatcher.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <context:component-scan base-package="com.unionpay.controller"></context:component-scan> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/WEB-INF/views/"></property> <property name="suffix" value=".jsp"></property> </bean> </beans>
就這麼簡單。由於kafka-beans.xml主要作用是配置注入的beans,所以按照程式,還是先建立好Model才符合邏輯。
在src/main/java路徑下分別建立三個package包:com.unionpay.producer、com.unionpay.consumer、com.unionpay.controller。從字面意思很容易理解,producer包主要用於存放生產者,consumer包主要用於存放消費者,controller包主要用於存放邏輯控制類。
完成後在com.unionpay.producer包下建立KafkaProducerDemo.java檔案(最好不要命名為Producer或者KafkaProducer,這樣會與引入的jar包中原有的類重名):
KafkaProducerDemo.java
1 package com.unionpay.producer; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 8 public class KafkaProducerDemo { 9 10 Properties properties; 11 12 public KafkaProducerDemo(Properties properties) { 13 super(); 14 this.properties = properties; 15 } 16 17 public Properties getProperties() { 18 return properties; 19 } 20 21 public void setProperties(Properties properties) { 22 this.properties = properties; 23 } 24 25 public void sendMessage(String msg) { 26 27 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); 28 29 ProducerRecord<String, String> record = new ProducerRecord<String, String>(properties.getProperty("topic"), 30 msg); 31 producer.send(record); 32 33 producer.close(); 34 35 } 36 37 }
Properties屬性主要是為了配置KafkaProducer類,具體資訊通過Spring注入,這樣可以顯得更加高大上和靈活。後面的配置檔案中我們可以看得到具體的配置資訊。
同樣,在com.unionpay.consumer包下面新建類KafkaConsumerDemo.java:
1 package com.unionpay.consumer; 2 3 import java.util.Arrays; 4 import java.util.Properties; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.KafkaConsumer; 9 10 public class KafkaConsumerDemo { 11 12 private Properties props; 13 14 public KafkaConsumerDemo(Properties props) { 15 super(); 16 this.props = props; 17 } 18 19 public Properties getProps() { 20 return props; 21 } 22 23 public void setProps(Properties props) { 24 this.props = props; 25 } 26 27 public String receive(){ 28 29 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); 30 consumer.subscribe(Arrays.asList(props.getProperty("topic"))); 31 32 String msg = ""; 33 while(true){ 34 ConsumerRecords<String,String> consumerRecords = consumer.poll(100); 35 for(ConsumerRecord<String, String> consumerRecord:consumerRecords){ 36 msg += consumerRecord.value(); 37 } 38 consumer.close(); 39 return msg; 40 } 41 } 42 43 }
也是基於同樣的原因,KafkaConsumer的配置資訊properties也是通過Spring配置檔案注入。
當Producer和Consumer編寫完成後,就可以編寫kafka-beans.xml檔案啦:
kafka-beans.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:mvc="http://www.springframework.org/schema/mvc" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 6 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd 7 http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> 8 9 10 <context:component-scan base-package="com.unionpay.producer"></context:component-scan> 11 <context:component-scan base-package="com.unionpay.consumer"></context:component-scan> 12 13 14 <bean id="kafkaProducerDemo" class="com.unionpay.producer.KafkaProducerDemo"> 15 <property name="properties"> 16 <props> 17 <prop key="topic">my-replicated-topic</prop> 18 <prop key="bootstrap.servers">127.0.0.1:9092</prop> 19 <prop key="acks">all</prop> 20 <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer 21 </prop> 22 <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer 23 </prop> 24 <prop key="buffer.memory">33554432</prop> 25 </props> 26 27 </property> 28 </bean> 29 30 <bean id="kafkaConsumerDemo" class="com.unionpay.consumer.KafkaConsumerDemo"> 31 <property name="props"> 32 <props> 33 <prop key="topic">my-replicated-topic</prop> 34 <prop key="bootstrap.servers">127.0.0.1:9092</prop> 35 <prop key="group.id">group1</prop> 36 <prop key="enable.auto.commit">true</prop> 37 <prop key="auto.commit.interval.ms">1000</prop> 38 <prop key="session.timeout.ms">30000</prop> 39 <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer 40 </prop> 41 <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer 42 </prop> 43 </props> 44 45 </property> 46 </bean> 47 </beans>
kafka的主要配置一共有三種:broker、producer和consumer,對於客戶端來說就是後兩種啦。而後兩種的配置項從官方檔案可以知道,每個都至少有30多種配置內容。通過上面這種注入配置方式的話,在<props><props>中隨便新增配置內容,是不是很靈活呢^_^
下面在com.unionpay.controller包下編寫Controller類,控制業務邏輯:
KafkaController.java
package com.unionpay.controller; import java.text.SimpleDateFormat; import java.util.Date; import javax.annotation.Resource; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import com.unionpay.consumer.KafkaConsumerDemo; import com.unionpay.producer.KafkaProducerDemo; @Controller public class KafkaController { @Resource(name = "kafkaProducerDemo") KafkaProducerDemo producer; @Resource(name = "kafkaConsumerDemo") KafkaConsumerDemo consumer; @RequestMapping(value = "/welcome") public ModelAndView welcome() { System.out.println("--------welcome--------"); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/sendmessage", method = RequestMethod.GET) public ModelAndView sendMessage() { System.out.println("--------sendmessage--------"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String now = sdf.format(date); ModelAndView mv = new ModelAndView(); mv.addObject("time", now); mv.setViewName("kafka_send"); return mv; } @RequestMapping(value = "/onsend", method = RequestMethod.POST) public ModelAndView onsend(@RequestParam("message") String msg) { System.out.println("--------onsend--------"); producer.sendMessage(msg); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/receive") public ModelAndView receive() { System.out.println("--------receive--------"); String msg = consumer.receive(); ModelAndView mv = new ModelAndView(); mv.addObject("msg", msg); mv.setViewName("kafka_receive"); return mv; } }
到目前為止,我們的MVC中已經完成了兩個啦(M and C),下面編寫最後的三個JSP檔案。從解析配置檔案(spring-mvc-dispatcher.xml)來看,我們的JSP頁面應該建立在/WEB-INF/views/目錄下,所以我們首先在/WEB-INF/目錄下建立views資料夾。然後在該資料夾下面建立三個jsp檔案:
welcome.jsp
1 <%@ page language="java" contentType="text/html; charset=UTF-8" 2 pageEncoding="UTF-8"%> 3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> 4 <html> 5 <head> 6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> 7 <title>welcome</title> 8 </head> 9 <body> 10 <h1>Welcome</h1> 11 <h2><a href="sendmessage">Send a Message</a></h2> 12 <h2><a href="receive">Get a Message</a></h2> 13 </body> 14 </html>
kafka_send.jsp
1 <%@ page language="java" contentType="text/html; charset=UTF-8" 2 pageEncoding="UTF-8"%> 3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> 4 <html> 5 <head> 6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> 7 <title>kafka_send</title> 8 </head> 9 <body> 10 <h1>Send a Message</h1> 11 <form action="onsend" method="post"> 12 MessageText:<textarea name="message">${time}</textarea> 13 <br> 14 <input type="submit" value="Submit"> 15 </form> 16 17 <h2><a href="welcome">RETURN HOME</a></h2> 18 19 </body> 20 </html>
kafka-receive.jsp
1 <%@ page language="java" contentType="text/html; charset=UTF-8" 2 pageEncoding="UTF-8"%> 3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> 4 <html> 5 <head> 6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> 7 <title>kafka_receive</title> 8 </head> 9 <body> 10 11 <h1>Kafka_Reveive!!!</h1> 12 <h2>Receive Message : ${msg}</h2> 13 <h2><a href="welcome">RETURN HOME</a></h2> 14 </body> 15 </html>
啊,終於大功告成啦。下面就要品嚐我們的勞動果實啦。將專案部署在容器裡,然後首先啟動zookeeper叢集伺服器,然後啟動kafka叢集伺服器:
//啟動zookeeper叢集伺服器 cd ~/DevelopEnvironment/zookeeper-3.4.9-kafka/bin ./zkServer.sh start //啟動kafka叢集伺服器 cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin ./kafka-server-start.sh ../config/server.properties ./kafka-server-start.sh ../config/server-1.properties ./kafka-server-start.sh ../config/server-2.properties
然後通過Eclipse啟動容器:
從上面終端打印出來的資訊可以知道,部署成功啦。下面就要輪迴到本文開頭啦,在網頁位址列中輸入:http://127.0.0.1:8080/kafkaSpringMVC/welcome 進入歡迎介面,然後按照開始描述的操作進行操作,看看能否成功傳送和接受訊息呢?
怎麼樣,你成功了嗎?反正我是成功了,也希望你也成功啦。如果出現什麼錯誤的話也千萬彆著急,去根據報錯資訊找找原因,因為你也馬上就要成功啦。
原始碼下載:kafkaSpringMVC.zip
參考文獻