Java程式碼Websocket實時更新kafkaConsumer接收的訊息
阿新 • • 發佈:2018-12-22
本人在跟著導師做大資料分析,需要用到kafka,在Producer端,用Python(用的是kafka-python)將讀取的資料經過訓練好的機器學習模型計算之後,傳送到指定的伺服器(IP)的指定topic,因為需要將資料在web端做展示,所以Consumer端用Java語言來寫。這幾天在網上參考了很多帖子,現在經過實際執行無誤之後,決定將程式碼分享給大家,下面是Websocket結合kafka的Java API來實時更新資料的程式碼。
package servlet; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.websocket.EncodeException; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.RemoteEndpoint; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import net.sf.json.JSONArray; @ServerEndpoint("/getServer2") public class websocket { //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。 private static int onlineCount = 0; //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。若要實現服務端與單一客戶端通訊的話,可以使用Map來存放,其中Key可以為使用者標識 private static CopyOnWriteArraySet<websocket> webSocketSet = new CopyOnWriteArraySet<websocket>(); //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料 private Session session; /** * 連線建立成功呼叫的方法 * @param session 可選的引數。session為與某個客戶端的連線會話,需要通過它來給客戶端傳送資料 */ @OnOpen public void onOpen(Session session) throws EncodeException { //1.配置屬性 Properties properties = new Properties(); //配置kafka計算節點的地址 properties.put("bootstrap.servers", "172.17.0.4:9092");//172.17.0.4是Producer傳送給伺服器的IP地址,根據自己的實際情況修改 //設定消費組 properties.put("group.id", "g1"); //是否自動確認offset properties.put("enable.auto.commit", "true"); //接收訊息的反序列化 //key properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2.建立消費則例項 final KafkaConsumer< String,String> consumer = new KafkaConsumer<String, String>(properties); //訂閱訊息主題 consumer.subscribe(Arrays.asList("test"));//test是topic的名字 //4.釋放資源 //啟動一個守護執行緒,在虛擬機器要掛的時候或者執行不了了 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { if(consumer != null){ consumer.close(); } } })); System.out.println("web"); this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //線上數加1 System.out.println("有新連線加入!當前線上人數為" + getOnlineCount()); final RemoteEndpoint.Basic basic = session.getBasicRemote(); Runnable runnable = new Runnable() { @Override public void run() { try { ConsumerRecords<String, String> records = consumer.poll(10); //拿到的訊息很多,要遍歷 for(ConsumerRecord<String, String> record : records){ basic.sendText(record.value()); System.out.println(record.offset()+"******"+record.key()+"******"+record.value()); } } catch (IOException e) { e.printStackTrace(); } } }; ScheduledExecutorService sc = Executors.newSingleThreadScheduledExecutor(); // 第二個引數為首次執行的延時時間,第三個引數為定時執行的間隔時間 sc.scheduleAtFixedRate(runnable, 1, 10, TimeUnit.SECONDS); } @OnError public void onError(Throwable throwable, Session session) { System.out.println("pathParams:" + session.getPathParameters()); System.out.println("requestParams" + session.getRequestParameterMap()); System.out.print("onError" + throwable.toString()); } /** * 關閉連線時觸發 * * @param relationId * @param userCode * @param session */ @OnClose public void onClose(Session session) { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //線上數減1 System.out.println("pathParams:" + session.getPathParameters()); System.out.println("requestParams" + session.getRequestParameterMap()); System.out.print("onClose "); } /** * 接收前端傳過來的資料。 * 雖然在實現推送邏輯中並不需要接收前端資料,但是作為一個webSocket的教程或叫備忘,還是將接收資料的邏輯加上了。 */ @OnMessage public void onMessage(String message ,Session session){ System.out.println(message + "from " + session.getId()); } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { websocket.onlineCount++; } public static synchronized void subOnlineCount() { websocket.onlineCount--; } } 下面是JSP頁面展示的程式碼: <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <% String path = request.getContextPath(); String socPath="ws://"+request.getServerName()+":"+request.getServerPort()+path+"/"; %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> <script type="text/javascript" src="<%=path %>/js/jquery-1.12.3.js"></script> <script type="text/javascript" src="<%=path %>/js/echarts.min.js"></script> <script type="text/javascript"> $(function(){ var wsuri = "<%=socPath%>getServer2"; if ('WebSocket' in window) websocket = new WebSocket(wsuri); else if ('MozWebSocket' in window) websocket = new MozWebSocket(wsuri); else { alert('當前瀏覽器 Not support websocket') } //連線發生錯誤的回撥方法 websocket.onerror = function () { setMessageInnerHTML("WebSocket連線發生錯誤"); }; //連線成功建立的回撥方法 websocket.onopen = function (event) { setMessageInnerHTML("WebSocket連線成功"); } //接收到訊息的回撥方法 websocket.onmessage = function (event) { setMessageInnerHTML(event.data); } //連線關閉的回撥方法 websocket.onclose = function () { setMessageInnerHTML("WebSocket連線關閉"); } //監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連線,防止連線還沒斷開就關閉視窗,server端會拋異常。 window.onbeforeunload = function () { closeWebSocket(); } //將訊息顯示在網頁上 function setMessageInnerHTML(innerHTML) { document.getElementById('sp').innerHTML += innerHTML + '<br/>'; } //關閉WebSocket連線 function closeWebSocket() { websocket.close(); } function sendMsg() { socket.send("This is a client message "); } //傳送訊息 /* function send() { var message = document.getElementById('text').value; websocket.send(message); } */ <%-- $.ajax({ url: '<%=path %>/cum', type: 'post', success:function(res){ document.getElementById('sp').innerHTML = 'hahahahah'; alert("hahahah") alert(res) } }) --%> }) </script> <body> <span id="sp"></span> </body> </html>