心跳檢測的思路及程式碼
外網服務端儲存內網服務端會話的有效性以及平臺上監控所有內網服務端的網路狀況,模仿心跳機制實現,這裡在做一點敘訴,關於思路和具體實現。
在很多的平臺應用中,都有這樣的需求,平臺內包括多個子系統或者屬於其管控範圍內的其他平臺,需要對這些系統進行統一的監控,來檢視當前的執行狀態或者其他執行資訊,我們的應用也有這樣的一個情況,需要再外網服務端(平臺)上監控,其下執行的多個內網服務端的網路狀況,查閱了寫資料後確立了2種可實現的方式。
1:輪詢機制
2:心跳機制
先簡單介紹一下,
輪詢:概括來說是服務端定時主動的去與要監控狀態的客戶端(或者叫其他系統)通訊,詢問當前的某種狀態,客戶端返回狀態資訊,客戶端沒有返回或返回錯誤、失效資訊、則認為客戶端已經宕機,然後服務端自己內部把這個客戶端的狀態儲存下來(宕機或者其他),如果客戶端正常,那麼返回正常狀態,如果客戶端宕機或者返回的是定義的失效狀態那麼當前的客戶端狀態是能夠及時的監控到的,如果客戶端宕機之後重啟了那麼當服務端定時來輪詢的時候,還是可以正常的獲取返回資訊,把其狀態重新更新。
心跳:最終得到的結果是與輪詢一樣的但是實現的方式有差別,心跳不是服務端主動去發信息檢測客戶端狀態,而是在服務端儲存下來所有客戶端的狀態資訊,然後等待客戶端定時來訪問服務端,更新自己的當前狀態,如果客戶端超過指定的時間沒有來更新狀態,則認為客戶端已經宕機或者其狀態異常。
心跳機制與輪詢的比較,在我們的應用中,採用的是心跳,這樣一是避免服務端的壓力,二是靈活好控制,上一篇文章中提到過,我們的外網服務端(服務端)不知道內網服務端(客戶端)的地址,有雖然有儲存客戶端的socket會話,但是客戶端宕機會話就失效了。所以只能等著他主動來報告狀態。
在來說一下實現方式,這個很簡單,就是一個思路問題。
首先,客戶端(內網服務端)啟動後,帶著自己的識別符號與服務端建立socket連線,服務端快取下來對應資訊(上一篇文章中已經實現過了),然後在通過socket流,定時傳送當前資訊訊息到服務端(外網伺服器端)某個介面,服務端收到後更新當前的客戶端的狀態,比如(會話地址,識別符號,網路的活躍狀態,連線時間,心跳時間),本次來更新的時間就是心跳時間,然後服務端還有一個定時器,定時檢查所有快取的客戶端會話集合,將其中心跳時間與當前時間進行對比,如果超過指定的時間還沒有來更新則認為該客戶端的網路出現異常或者宕機,然後更新該客戶端的網路狀態。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.DocumentHelper; import cn.edu.zju.cst.mina.im.server.entity.User; import cn.edu.zju.cst.mina.im.server.handler.ServerControler; public class UserStateManage extends Thread { //線上使用者狀態列表 static HashMap<Integer, UserState> userStateList = new HashMap<Integer, UserState>(); Object hashLock = new Object(); //當前的連線數和工作執行緒數 static int workThreadNum = 0; static int socketConnect = 0; private ServerSocket serverSocket; //伺服器IP private String host = "10.82.81.79"; //伺服器埠 private int stateReportPort = 60001; //設定心跳包的結束標記 String endFlag = "</protocol>"; CharSequence csEndFlag = endFlag.subSequence(0, 10); //掃描間隔 private int scanTime = 1800; @Override public void run() { //繫結埠,並開始偵聽使用者的心跳包 serverSocket = startListenUserReport(stateReportPort); if(serverSocket == null){ System.out.println("【建立ServerSocket失敗!】"); return; } //啟動掃描執行緒 Thread scanThread = new Thread(new scan()); scanThread.start(); //等待使用者心跳包請求 while(true){ Socket socket = null; try { socketConnect = socketConnect + 1; //接收客戶端的連線 socket = serverSocket.accept(); //為該連線建立一個工作執行緒 Thread workThread = new Thread(new Handler(socket)); //啟動工作執行緒 workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } /** * 建立一個ServerSocket來偵聽使用者心跳包請求 * @param port 指定的伺服器端的埠 * @return 返回ServerSocket * @author dream */ public ServerSocket startListenUserReport(int port){ try { ServerSocket serverSocket = new ServerSocket(); if(!serverSocket.getReuseAddress()){ serverSocket.setReuseAddress(true); } serverSocket.bind(new InetSocketAddress(host,port)); System.out.println("【開始在"+serverSocket.getLocalSocketAddress()+"上偵聽使用者的心跳包請求!】"); return serverSocket; } catch (IOException e) { System.out.println("【埠"+port+"已經被佔用!】"); if (serverSocket != null) { if (!serverSocket.isClosed()) { try { serverSocket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } } return serverSocket; } //工作執行緒類 class Handler implements Runnable{ private Socket socket; UserState us = null; User newUser = null; private int userId; private int userState; /** * 建構函式,從呼叫者那裡取得socket * @param socket 指定的socket * @author dream */ public Handler(Socket socket){ this.socket = socket; } /** * 從指定的socket中得到輸入流 * @param socket 指定的socket * @return 返回BufferedReader * @author dream */ private BufferedReader getReader(Socket socket){ InputStream is = null; BufferedReader br = null; try { is = socket.getInputStream(); br = new BufferedReader(new InputStreamReader(is)); } catch (IOException e) { e.printStackTrace(); } return br; } public void run() { try{ workThreadNum = workThreadNum +1; System.out.println("【第"+workThreadNum+"個的連線:"+socket.getInetAddress()+":"+socket.getPort()+"】"); BufferedReader br = getReader(socket); String meg = null; StringBuffer report = new StringBuffer(); while ((meg = br.readLine()) != null) { report.append(meg); if (meg.contains(csEndFlag)) { us = getReporterUserState(meg, socket); synchronized (hashLock) { userStateList.put(userId, us); } } } }catch(IOException e){ System.out.println("【客戶:"+newUser.getUser_id()+"已經斷開連線!】"); userStateList.remove( userId ); announceStateChange( userId , -1); }finally{ if(socket != null){ try { //斷開連線 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } private UserState getReporterUserState(String meg , Socket socket){ UserState us = new UserState(); try { Document requestDoc = DocumentHelper.parseText(meg); newUser = ServerControler.parseXmlToUserState(requestDoc,socket); userId = newUser.getUser_id(); userState = newUser.getUser_state(); us.setFlag(2); us.setUser_state( userState ); us.setUser_id( userId ); us.setUser_ip(newUser.getUser_ip()); us.setUser_port(newUser.getUser_port()); } catch (DocumentException e) { System.out.println("【來自客戶端的資訊不是一個合法的心跳包協議】"); } return us; } } //掃描執行緒 class scan implements Runnable{ public void run() { while (true) { System.out.println("*******"+new Date()+":掃描執行緒開始掃描"+"*******"); synchronized (hashLock) { if(!userStateList.isEmpty()){ //遍歷線上使用者列表 for (Map.Entry<Integer, UserState> entry : userStateList.entrySet()) { int flag = entry.getValue().getFlag(); if ( (flag - 1) < 0) { //在這裡通知該使用者的好友其狀態發生改變 // announceStateChange(entry.getKey() , 0); }else{ entry.getValue().setFlag(flag - 1); userStateList.put(entry.getKey(), entry.getValue()); } System.out.println(entry.getKey() + "-->" + entry.getValue().toString()); } }else{ System.out.println("現在還沒有線上使用者!"); } } //實現定時掃描 try { sleep(scanTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } private void announceStateChange(int userId , int state){ System.out.println("通知其好友!"); } /** * 查詢一個使用者是否線上 * @param userId 指定要查詢狀態的使用者的ID * @return true 線上; false 不線上; * @author dream */ public boolean isAlive(int userId){ synchronized (hashLock) { return userStateList.containsKey(userId); } } /** * 返回指定使用者ID的狀態 * @param userId 指定要查詢狀態的使用者的ID * @return >0 該使用者線上; -1 該使用者離線 * @author dream */ public int getUserState(int userId){ synchronized (hashLock) { if(userStateList.containsKey(userId)){ return userStateList.get(userId).getUser_state(); }else{ return -1; } } } public Object getHashLock() { return hashLock; } public void setHashLock(Object hashLock) { this.hashLock = hashLock; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStateReportPort() { return stateReportPort; } public void setStateReportPort(int stateReportPort) { this.stateReportPort = stateReportPort; } public String getEndFlag() { return endFlag; } public void setEndFlag(String endFlag) { this.endFlag = endFlag; } public int getScanTime() { return scanTime; } public void setScanTime(int scanTime) { this.scanTime = scanTime; } public static HashMap<Integer, UserState> getUserStateList() { return userStateList; } public static int getWorkThreadNum() { return workThreadNum; } public static int getSocketConnect() { return socketConnect; } //測試本函式的main函式 public static void main(String arg[]){ UserStateManage usm = new UserStateManage(); usm.start(); } }