1. 程式人生 > >socket 通訊檢測客戶端非正常斷線。

socket 通訊檢測客戶端非正常斷線。

package com.ist.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;

/**
 *  Socket收發器 通過Socket傳送資料,並使用新執行緒監聽Socket接收到的資料
 * @author songqiang
 * @createtime 2015-12-15
 */
public abstract class MediaTransceiver implements Runnable{

	protected Socket socket;
	protected InetAddress addr;
	protected String falshId;
	protected DataInputStream in;
	protected DataOutputStream out;
	private boolean runFlag;
	//是否線上標識
	private boolean onlineFlag;
	/**
	 * 伺服器端例項化
	 * @param socket
	 */
	public MediaTransceiver(Socket socket) {
		this.socket = socket;
		this.addr = socket.getInetAddress();
	}
	/**
	 * 監聽Socket接收的資料(新執行緒中執行)
	 */
	@Override
	public void run() {
		try {
			//socket輸入流
			in = new DataInputStream(this.socket.getInputStream());
			//socket輸出流
			out = new DataOutputStream(this.socket.getOutputStream());
		} catch (IOException e) {
			e.printStackTrace();
			runFlag = false;
		}
		//無限接收服務端資訊,直到連線埠
		while(runFlag){
			try{
				//接受資料
				final String s = in.readUTF();
				if(s.equals("1")){
					onlineFlag = true;
				}
				this.onReceive(addr, s);
			}catch(EOFException e){
				
			}catch (IOException e){
				// 連線被斷開(被動)
				runFlag = false;
			}
		}
		//斷開連線
		try {
			in.close();
			out.close();
			socket.close();
			in = null;
			out = null;
			socket = null;
		} catch (IOException e) {
			e.printStackTrace();
		}
		this.onDisconnect(addr);
	}
	static void delay() {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	/**
	 * 開啟Socket收發
	 * 如果開啟失敗,會斷開連線並回調{@code onDisconnect()}
	 */
	public void start(){
		runFlag = true;
		new Thread(this).start();
	}

	/**
	 * 斷開連線(主動)
	 * 連線斷開後,會回撥{@code onDisconnect()}
	 */
	public void stop(){
		runFlag = false;
		try {
			socket.shutdownInput();
			in.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	//向伺服器傳送falshId
	public void sendFalshId(String falshId){
		if(out!=null){
			try{
				send("falshId:"+falshId);
			}catch(Exception e){
				e.printStackTrace();
			}
		}else{
			if(runFlag){
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				sendFalshId(falshId);
			}
		}
	}
	/**
	 * 向服務端傳送資訊
	 * @param s
	 * @return
	 */
	public boolean send(String s){
		if(out!=null){
			try{
				out.writeUTF(s);
				out.flush();
				return true;
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		return false;
	}
	/**
	 * 檢查socket是否執行
	 */
	public void checkOnLine(){
		send("0");
		onlineFlag = false;
		//執行緒等待onlineFlag標識是否改變
		Thread checkThread = new Thread(new Runnable() {		
			@Override
			public void run() {
				delay();
				//標識沒有改變則判斷為離線
				if(!onlineFlag){
					stop();
				}
			}
		});
		checkThread.start();
	}
	/**
	 * 獲取連線到的Socket地址
	 * 
	 * @return InetAddress物件
	 */
	public InetAddress getInetAddress() {
		return addr;
	}
	
	public String getFalshId() {
		return falshId;
	}

	public void setFalshId(String falshId) {
		this.falshId = falshId;
	}
	//-------------------------------例項化時實現---------------------------
	/**
	 * 接收到資料
	 * 注意:此回撥是在新執行緒中執行的
	 * @param addr 連線到的Socket地址
	 * @param s:收到的字串
	 */
	public abstract void onReceive(InetAddress addr, String s);

	/**
	 * 連線斷開
	 * 注意:此回撥是在新執行緒中執行的
	 * @param addr
	 * 連線到的Socket地址
	 */
	public abstract void onDisconnect(InetAddress addr);
	
}
public abstract class MediaServer implements Runnable{
	private static MediaServer mediaServer=null;
	//監聽埠
	private int port;
	//服務端執行標識
	private boolean runFlag;
	//客戶端響應集合(ConcurrentHashMap執行緒安全解決併發修改問題)
	private Map results = new ConcurrentHashMap();
	//客戶端集合
	private Map clinets = new ConcurrentHashMap();
	/**
	 * 例項化服務端
	 * @param port
	 */
	private MediaServer(int port) {
		this.port = port;
	}
	//加同步鎖(執行緒安全)
	private static synchronized void syncInit(){
		if(mediaServer==null){
			mediaServer = new MediaServer(port) {
				//伺服器停止
				@Override
				public void onServerStop() {
					
				}
				//接收方法
				@Override
				public void onReceive(MediaTransceiver client, String s) {
					this.getResults().put(client.falshId, s);
					delay(1);
					//System.out.println("接收結果:"+this.toString()+":"+this.getResults().get(client.falshId));
				}
				//斷開連線
				@Override
				public void onDisconnect(MediaTransceiver client) {
					if(null!=client){
						this.getClinets().remove(client.falshId);
						this.getResults().remove(client.falshId);
						updateLogoutTime(client.falshId);
					}
				}
				//連線失敗
				@Override
				public void onConnectFailed() {
					System.out.println("連線失敗!");
				}
				//連線上
				@Override
				public void onConnect(MediaTransceiver client) {
					
				}		
			};
			mediaServer.start();	
		}
	}
	/**
	 * 獲取伺服器例項與建立例項分開
	 * 如果在建構函式中丟擲異常,例項將永遠得不到建立
	 * @return
	 */
	public static synchronized MediaServer getInstance(){
		if(mediaServer == null){
			syncInit();
			mediaServer.updateAllOffLine();
		}
		mediaServer.checkOnLine();
		return mediaServer;
	}
	/**
	 * 伺服器啟動
	 * 如果啟動失敗,會回撥onServerStop()
	 */
	public void start(){
		runFlag = true;
		new Thread(this).start();
	}
	/**
	 * 伺服器停止
	 * 伺服器停止後,會回撥{@code onServerStop()}
	 */
	public void stop(){
		runFlag = false;
	}
	/**
	 * 監聽埠,接受客戶端連線(新執行緒中執行)
	 */
	@Override
	public void run() {
		try{
			final ServerSocket server = new ServerSocket(port);
			//無限等待啟動客戶端,直到伺服器關閉
			while(runFlag){
				try{
					final Socket socket = server.accept();
					//socket.setSoTimeout(10000);//十秒連線超時
					//與客戶端建立連線
					startClinet(socket);
				} catch (IOException e) {
					// ServerSocket物件創建出錯,伺服器啟動失敗
					this.onConnectFailed();
				}
			}
			//停止伺服器
			try{
				for(String key:clinets.keySet()){
					clinets.get(key).stop();
				}
				clinets.clear();
				results.clear();
				server.close();
				mediaServer=null;
			}catch (IOException e) {
				// ServerSocket物件創建出錯,伺服器啟動失敗
				e.printStackTrace();
			}
			
		} catch (IOException e) {
			// ServerSocket物件創建出錯,伺服器啟動失敗
			e.printStackTrace();
		}
		this.onServerStop();
	}
	
	/**
	 *  啟動客戶端收發
	 * @param socket
	 */
	public void startClinet(Socket socket){
		//伺服器端接收物件
		MediaTransceiver clinet = new MediaTransceiver(socket) {
			//接收資訊
			@Override
			public void onReceive(InetAddress addr, String s) {
				if("0".equals(s)){
					this.send("1");
				}else{
					System.out.println("接收:"+s);
					if(null!=s && s.startsWith("falshId:")){//新增客戶端
						s=s.replace("falshId:", "");	
						if(s!=null && !"null".equals(s.trim()) && hasFalshId(s)){
							this.falshId=s;
							clinets.put(this.falshId,this);
							updateOnline(falshId);
						}
					}else if(this.falshId!=null){//接受傳送訊息
						System.out.println(this.falshId+":"+s);
						MediaServer.this.onReceive(this, s);
					}else{
						//System.out.println("返回值"+s);
					}
				}
			}
			//伺服器斷開連線
			@Override
			public void onDisconnect(InetAddress addr) {
				if(null!=this && null!= this.falshId){
					MediaServer.this.onDisconnect(this);
				}
			}			
		};
		clinet.start();
		this.onConnect(clinet);
	}
	//向客戶端傳送命令
	public boolean send(String falshId,String s){	
		for(String key:clinets.keySet()){
			MediaTransceiver mt = clinets.get(key);
			if(null!=mt && mt.falshId!=null && mt.falshId.equals(falshId)){
				//連線不為空且soket是連線著的
				if(null!=mt.socket && mt.socket.isConnected()){
					mt.send(s);
					return true;
				}	
			}
		}
		return false;
	}
	//等待客戶端響應
	public String getResult(String falshId){
		String resultstr = this.getResults().get(falshId);
		if(null == resultstr){
			return "";
		}else{//去掉已返回命令
			this.getResults().remove(falshId);	
		}
		return resultstr;
	}
	//等待客戶端響應
	public Map getResults(){
		return results;
	}
	public Map getClinets() {
		return clinets;
	}	
	
	//修改離線時間
	public void updateLogoutTime(String falshId){
	    具體實現
	}
	//修改線上終端
	public void updateOnline(String falshId){
	    具體實現
	}
	public boolean hasFalshId(String falshId){
	    具體實現
	}
	//修改所有終端為離線
	public void updateAllOffLine(){
	    具體實現
	}
	
	/**
	 * 檢查是否線上
	 */
	public void checkOnLine(){
		for(String clinetId:clinets.keySet()){
			MediaTransceiver clinet = clinets.get(clinetId);
			clinet.checkOnLine();
		}
	}
	
	static void delay(int count) {
		try {
			Thread.sleep(count*1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	//---------------------------具體實現交給子類,或是例項化時實現-----------------------
	/**
	 * 客戶端:連線建立
	 * 注意:此回撥是在新執行緒中執行的
	 * @param client 客戶端物件
	 */
	public abstract void onConnect(MediaTransceiver client);

	/**
	 * 客戶端:連線建立失敗
	 * 注意:此回撥是在新執行緒中執行的
	 */
	public abstract void onConnectFailed();

	/**
	 * 客戶端:收到字串
	 * 注意:此回撥是在新執行緒中執行的
	 * @param client 客戶端物件
	 * @param s 字串
	 */
	public abstract void onReceive(MediaTransceiver client, String s);

	/**
	 * 客戶端:連線斷開
	 * 注意:此回撥是在新執行緒中執行的
	 * @param client MediaCline客戶端物件
	 */
	public abstract void onDisconnect(MediaTransceiver client);

	/**
	 * 伺服器停止
	 * 注意:此回撥是在新執行緒中執行的
	 */
	public abstract void onServerStop();
package com.ist.socket;

import java.net.InetAddress;
import java.net.Socket;


public abstract class MediaClinet implements Runnable{
	
	private int port;
	private String hostIP;
	private boolean mBlnNetIsOk = false;
	private MediaTransceiver transceiver;
	//終端標識碼
	private String falshId;
	
	public MediaClinet(String falshId) {
		this.falshId = falshId;
	}
	/**
	 * 建立連線
	 * 連線的建立將在新執行緒中進行
	 * 連線建立成功,回撥{@code onConnect()}
	 * 連線建立失敗,回撥{@code onConnectFailed()}
	 * @param hostIP 伺服器主機IP
	 * @param port 埠
	 */
	public void connect(String hostIP, int port) {
		this.hostIP = hostIP;
		this.port = port;
		new Thread(this).start();
	}
	
	private void connect(){
		try{
			Socket socket = new Socket(hostIP, port);
			transceiver = new MediaTransceiver(socket) {
				
				@Override
				public void onReceive(InetAddress addr, String s) {
					if(s.equals("0")){
						this.send("1");
					}
					MediaClinet.this.onReceive(transceiver, s);
					
				}
				
				@Override
				public void onDisconnect(InetAddress addr) {
					mBlnNetIsOk = false;
					MediaClinet.this.onDisconnect(transceiver,falshId);
				}
			};
			transceiver.start();
			mBlnNetIsOk = true;
			transceiver.sendFalshId(falshId);
			this.onConnect(transceiver,falshId);
			
		} catch (Exception e) {
			e.printStackTrace();
			this.onConnectFailed();
		}
	}
	@Override
	public void run(){
		connect();
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		while (true) {	
			if (mBlnNetIsOk == false) {
				connect();
			}
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	private boolean sendPacket(String string) {
		// TODO Auto-generated method stub
		if(transceiver!=null){
			return transceiver.send(string);
		}
		return false;
	}
	/**
	 * 斷開連線
	 * 連線斷開,回撥{@code onDisconnect()}
	 */
	public void disconnect() {
		if (transceiver != null) {
			transceiver.stop();
			transceiver = null;
		}
	}
	/**
	 * 獲取Socket收發器
	 * 
	 * @return 未連線則返回null
	 */
	public MediaTransceiver getTransceiver() {
		return isConnected() ? transceiver : null;
	}

	
	/**
	 * 判斷是否連線
	 * 
	 * @return 當前處於連線狀態,則返回true
	 */
	public boolean isConnected() {
		return mBlnNetIsOk;
	}
	/**
	 * 連線建立
	 * @param transceiver SocketTransceiver物件
	 */
	public abstract void onConnect(MediaTransceiver transceiver,String falshId);

	/**
	 * 連線建立失敗
	 */
	public abstract void onConnectFailed();

	/**
	 * 接收到資料
	 * 注意:此回撥是在新執行緒中執行的
	 * @param transceiver Socket收發物件
	 * @param s 字串
	 */
	public abstract void onReceive(MediaTransceiver transceiver, String s);

	/**
	 * 連線斷開
	 * 注意:此回撥是在新執行緒中執行的
	 * @param transceiver Socket收發物件
	 */
	public abstract void onDisconnect(MediaTransceiver transceiver,String falshId);
	
	
}

這個socket 通訊類大體框架是借鑑某個高手的部落格裡面的,具體我就不記得了(我只能說對不起了)。我大概說一下我程式碼裡面的功能:

1.MediaServer 採用的是單例模式,沒當有一個clinet 連線的時候就會建立一個MediaTransceiver(接收者)物件。

2.MediaClinet 建立一個MediaTransceiver(接收者)物件併發送一個標識自己的falshid。(因為ip是會變的,不會做為唯一鍵)

3.也是socket比較難處理的一個問題:非正常斷線,就是你拔掉網線的時候socket是檢查不到斷開的。在網上看了很多帖子:(1)設定超時時間

(2)設定keepAlive (3)設定心跳包。第一個超時時間是read的阻塞時間,並不是說socket的運行了這麼久然後就超時斷開。所以超時和斷開沒關係,

第二個keepAlive是十二分鐘檢測一次伺服器的活動狀態,個人覺得有點久。第三個正常斷開還行,非正常斷開(拔網線)就檢測不出了。

所以我通過傳送0給客戶端等待客戶端響應1回來,沒有則判斷為離線。等待響應用的是執行緒,不然如果斷線則會卡在read方法那裡。

 

最後java是開源,程式碼共享,互相進步,延續這種精神。如果有疑問可以加qq群124795403 交流。我不是大牛,我只是程式碼的搬運工。