socket 通訊檢測客戶端非正常斷線。
阿新 • • 發佈:2018-12-27
package com.ist.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; /** * Socket收發器 通過Socket傳送資料,並使用新執行緒監聽Socket接收到的資料 * @author songqiang * @createtime 2015-12-15 */ public abstract class MediaTransceiver implements Runnable{ protected Socket socket; protected InetAddress addr; protected String falshId; protected DataInputStream in; protected DataOutputStream out; private boolean runFlag; //是否線上標識 private boolean onlineFlag; /** * 伺服器端例項化 * @param socket */ public MediaTransceiver(Socket socket) { this.socket = socket; this.addr = socket.getInetAddress(); } /** * 監聽Socket接收的資料(新執行緒中執行) */ @Override public void run() { try { //socket輸入流 in = new DataInputStream(this.socket.getInputStream()); //socket輸出流 out = new DataOutputStream(this.socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); runFlag = false; } //無限接收服務端資訊,直到連線埠 while(runFlag){ try{ //接受資料 final String s = in.readUTF(); if(s.equals("1")){ onlineFlag = true; } this.onReceive(addr, s); }catch(EOFException e){ }catch (IOException e){ // 連線被斷開(被動) runFlag = false; } } //斷開連線 try { in.close(); out.close(); socket.close(); in = null; out = null; socket = null; } catch (IOException e) { e.printStackTrace(); } this.onDisconnect(addr); } static void delay() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 開啟Socket收發 * 如果開啟失敗,會斷開連線並回調{@code onDisconnect()} */ public void start(){ runFlag = true; new Thread(this).start(); } /** * 斷開連線(主動) * 連線斷開後,會回撥{@code onDisconnect()} */ public void stop(){ runFlag = false; try { socket.shutdownInput(); in.close(); } catch (Exception e) { e.printStackTrace(); } } //向伺服器傳送falshId public void sendFalshId(String falshId){ if(out!=null){ try{ send("falshId:"+falshId); }catch(Exception e){ e.printStackTrace(); } }else{ if(runFlag){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } sendFalshId(falshId); } } } /** * 向服務端傳送資訊 * @param s * @return */ public boolean send(String s){ if(out!=null){ try{ out.writeUTF(s); out.flush(); return true; }catch(Exception e){ e.printStackTrace(); } } return false; } /** * 檢查socket是否執行 */ public void checkOnLine(){ send("0"); onlineFlag = false; //執行緒等待onlineFlag標識是否改變 Thread checkThread = new Thread(new Runnable() { @Override public void run() { delay(); //標識沒有改變則判斷為離線 if(!onlineFlag){ stop(); } } }); checkThread.start(); } /** * 獲取連線到的Socket地址 * * @return InetAddress物件 */ public InetAddress getInetAddress() { return addr; } public String getFalshId() { return falshId; } public void setFalshId(String falshId) { this.falshId = falshId; } //-------------------------------例項化時實現--------------------------- /** * 接收到資料 * 注意:此回撥是在新執行緒中執行的 * @param addr 連線到的Socket地址 * @param s:收到的字串 */ public abstract void onReceive(InetAddress addr, String s); /** * 連線斷開 * 注意:此回撥是在新執行緒中執行的 * @param addr * 連線到的Socket地址 */ public abstract void onDisconnect(InetAddress addr); }
public abstract class MediaServer implements Runnable{ private static MediaServer mediaServer=null; //監聽埠 private int port; //服務端執行標識 private boolean runFlag; //客戶端響應集合(ConcurrentHashMap執行緒安全解決併發修改問題) private Map results = new ConcurrentHashMap(); //客戶端集合 private Map clinets = new ConcurrentHashMap(); /** * 例項化服務端 * @param port */ private MediaServer(int port) { this.port = port; } //加同步鎖(執行緒安全) private static synchronized void syncInit(){ if(mediaServer==null){ mediaServer = new MediaServer(port) { //伺服器停止 @Override public void onServerStop() { } //接收方法 @Override public void onReceive(MediaTransceiver client, String s) { this.getResults().put(client.falshId, s); delay(1); //System.out.println("接收結果:"+this.toString()+":"+this.getResults().get(client.falshId)); } //斷開連線 @Override public void onDisconnect(MediaTransceiver client) { if(null!=client){ this.getClinets().remove(client.falshId); this.getResults().remove(client.falshId); updateLogoutTime(client.falshId); } } //連線失敗 @Override public void onConnectFailed() { System.out.println("連線失敗!"); } //連線上 @Override public void onConnect(MediaTransceiver client) { } }; mediaServer.start(); } } /** * 獲取伺服器例項與建立例項分開 * 如果在建構函式中丟擲異常,例項將永遠得不到建立 * @return */ public static synchronized MediaServer getInstance(){ if(mediaServer == null){ syncInit(); mediaServer.updateAllOffLine(); } mediaServer.checkOnLine(); return mediaServer; } /** * 伺服器啟動 * 如果啟動失敗,會回撥onServerStop() */ public void start(){ runFlag = true; new Thread(this).start(); } /** * 伺服器停止 * 伺服器停止後,會回撥{@code onServerStop()} */ public void stop(){ runFlag = false; } /** * 監聽埠,接受客戶端連線(新執行緒中執行) */ @Override public void run() { try{ final ServerSocket server = new ServerSocket(port); //無限等待啟動客戶端,直到伺服器關閉 while(runFlag){ try{ final Socket socket = server.accept(); //socket.setSoTimeout(10000);//十秒連線超時 //與客戶端建立連線 startClinet(socket); } catch (IOException e) { // ServerSocket物件創建出錯,伺服器啟動失敗 this.onConnectFailed(); } } //停止伺服器 try{ for(String key:clinets.keySet()){ clinets.get(key).stop(); } clinets.clear(); results.clear(); server.close(); mediaServer=null; }catch (IOException e) { // ServerSocket物件創建出錯,伺服器啟動失敗 e.printStackTrace(); } } catch (IOException e) { // ServerSocket物件創建出錯,伺服器啟動失敗 e.printStackTrace(); } this.onServerStop(); } /** * 啟動客戶端收發 * @param socket */ public void startClinet(Socket socket){ //伺服器端接收物件 MediaTransceiver clinet = new MediaTransceiver(socket) { //接收資訊 @Override public void onReceive(InetAddress addr, String s) { if("0".equals(s)){ this.send("1"); }else{ System.out.println("接收:"+s); if(null!=s && s.startsWith("falshId:")){//新增客戶端 s=s.replace("falshId:", ""); if(s!=null && !"null".equals(s.trim()) && hasFalshId(s)){ this.falshId=s; clinets.put(this.falshId,this); updateOnline(falshId); } }else if(this.falshId!=null){//接受傳送訊息 System.out.println(this.falshId+":"+s); MediaServer.this.onReceive(this, s); }else{ //System.out.println("返回值"+s); } } } //伺服器斷開連線 @Override public void onDisconnect(InetAddress addr) { if(null!=this && null!= this.falshId){ MediaServer.this.onDisconnect(this); } } }; clinet.start(); this.onConnect(clinet); } //向客戶端傳送命令 public boolean send(String falshId,String s){ for(String key:clinets.keySet()){ MediaTransceiver mt = clinets.get(key); if(null!=mt && mt.falshId!=null && mt.falshId.equals(falshId)){ //連線不為空且soket是連線著的 if(null!=mt.socket && mt.socket.isConnected()){ mt.send(s); return true; } } } return false; } //等待客戶端響應 public String getResult(String falshId){ String resultstr = this.getResults().get(falshId); if(null == resultstr){ return ""; }else{//去掉已返回命令 this.getResults().remove(falshId); } return resultstr; } //等待客戶端響應 public Map getResults(){ return results; } public Map getClinets() { return clinets; } //修改離線時間 public void updateLogoutTime(String falshId){ 具體實現 } //修改線上終端 public void updateOnline(String falshId){ 具體實現 } public boolean hasFalshId(String falshId){ 具體實現 } //修改所有終端為離線 public void updateAllOffLine(){ 具體實現 } /** * 檢查是否線上 */ public void checkOnLine(){ for(String clinetId:clinets.keySet()){ MediaTransceiver clinet = clinets.get(clinetId); clinet.checkOnLine(); } } static void delay(int count) { try { Thread.sleep(count*1000); } catch (InterruptedException e) { e.printStackTrace(); } } //---------------------------具體實現交給子類,或是例項化時實現----------------------- /** * 客戶端:連線建立 * 注意:此回撥是在新執行緒中執行的 * @param client 客戶端物件 */ public abstract void onConnect(MediaTransceiver client); /** * 客戶端:連線建立失敗 * 注意:此回撥是在新執行緒中執行的 */ public abstract void onConnectFailed(); /** * 客戶端:收到字串 * 注意:此回撥是在新執行緒中執行的 * @param client 客戶端物件 * @param s 字串 */ public abstract void onReceive(MediaTransceiver client, String s); /** * 客戶端:連線斷開 * 注意:此回撥是在新執行緒中執行的 * @param client MediaCline客戶端物件 */ public abstract void onDisconnect(MediaTransceiver client); /** * 伺服器停止 * 注意:此回撥是在新執行緒中執行的 */ public abstract void onServerStop();
package com.ist.socket; import java.net.InetAddress; import java.net.Socket; public abstract class MediaClinet implements Runnable{ private int port; private String hostIP; private boolean mBlnNetIsOk = false; private MediaTransceiver transceiver; //終端標識碼 private String falshId; public MediaClinet(String falshId) { this.falshId = falshId; } /** * 建立連線 * 連線的建立將在新執行緒中進行 * 連線建立成功,回撥{@code onConnect()} * 連線建立失敗,回撥{@code onConnectFailed()} * @param hostIP 伺服器主機IP * @param port 埠 */ public void connect(String hostIP, int port) { this.hostIP = hostIP; this.port = port; new Thread(this).start(); } private void connect(){ try{ Socket socket = new Socket(hostIP, port); transceiver = new MediaTransceiver(socket) { @Override public void onReceive(InetAddress addr, String s) { if(s.equals("0")){ this.send("1"); } MediaClinet.this.onReceive(transceiver, s); } @Override public void onDisconnect(InetAddress addr) { mBlnNetIsOk = false; MediaClinet.this.onDisconnect(transceiver,falshId); } }; transceiver.start(); mBlnNetIsOk = true; transceiver.sendFalshId(falshId); this.onConnect(transceiver,falshId); } catch (Exception e) { e.printStackTrace(); this.onConnectFailed(); } } @Override public void run(){ connect(); try { Thread.sleep(4000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } while (true) { if (mBlnNetIsOk == false) { connect(); } try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private boolean sendPacket(String string) { // TODO Auto-generated method stub if(transceiver!=null){ return transceiver.send(string); } return false; } /** * 斷開連線 * 連線斷開,回撥{@code onDisconnect()} */ public void disconnect() { if (transceiver != null) { transceiver.stop(); transceiver = null; } } /** * 獲取Socket收發器 * * @return 未連線則返回null */ public MediaTransceiver getTransceiver() { return isConnected() ? transceiver : null; } /** * 判斷是否連線 * * @return 當前處於連線狀態,則返回true */ public boolean isConnected() { return mBlnNetIsOk; } /** * 連線建立 * @param transceiver SocketTransceiver物件 */ public abstract void onConnect(MediaTransceiver transceiver,String falshId); /** * 連線建立失敗 */ public abstract void onConnectFailed(); /** * 接收到資料 * 注意:此回撥是在新執行緒中執行的 * @param transceiver Socket收發物件 * @param s 字串 */ public abstract void onReceive(MediaTransceiver transceiver, String s); /** * 連線斷開 * 注意:此回撥是在新執行緒中執行的 * @param transceiver Socket收發物件 */ public abstract void onDisconnect(MediaTransceiver transceiver,String falshId); }
這個socket 通訊類大體框架是借鑑某個高手的部落格裡面的,具體我就不記得了(我只能說對不起了)。我大概說一下我程式碼裡面的功能:
1.MediaServer 採用的是單例模式,沒當有一個clinet 連線的時候就會建立一個MediaTransceiver(接收者)物件。
2.MediaClinet 建立一個MediaTransceiver(接收者)物件併發送一個標識自己的falshid。(因為ip是會變的,不會做為唯一鍵)
3.也是socket比較難處理的一個問題:非正常斷線,就是你拔掉網線的時候socket是檢查不到斷開的。在網上看了很多帖子:(1)設定超時時間
(2)設定keepAlive (3)設定心跳包。第一個超時時間是read的阻塞時間,並不是說socket的運行了這麼久然後就超時斷開。所以超時和斷開沒關係,
第二個keepAlive是十二分鐘檢測一次伺服器的活動狀態,個人覺得有點久。第三個正常斷開還行,非正常斷開(拔網線)就檢測不出了。
所以我通過傳送0給客戶端等待客戶端響應1回來,沒有則判斷為離線。等待響應用的是執行緒,不然如果斷線則會卡在read方法那裡。
最後java是開源,程式碼共享,互相進步,延續這種精神。如果有疑問可以加qq群124795403 交流。我不是大牛,我只是程式碼的搬運工。