1. 程式人生 > >【Java TCP/IP Socket程式設計】----進階----多工處理

【Java TCP/IP Socket程式設計】----進階----多工處理

簡介

 基本的TCP相應伺服器是一次只能處理一個客戶端請求,無法處理同時多個客戶端請求,Java中多執行緒技術解決這一問題。多執行緒有兩種方式:一是一客戶一執行緒;二是執行緒池;

1)一客戶一執行緒:即為每個連線建立一個執行緒來處理,伺服器端會迴圈執行,監聽指定埠的連線,反覆接收來自客戶端的連線請求,併為每個連線建立一個新執行緒來對其處理。

    缺點:一客戶一執行緒的方式雖然處理可以多個客戶端請求,但每個新執行緒都會消耗系統資源(如CPU),並且每個執行緒獨有自己的資料結構(如棧)也要消耗系統記憶體。另外一個執行緒阻塞是,JVM會儲存其狀態,選擇另外一個執行緒執行,並在上下文轉換時恢復阻塞執行緒的狀態。隨著執行緒數的增加,執行緒將消耗越來越多的資源。這會導致系統將花費更多時間來處理上下文的轉換和執行緒管理,更少的時間來對連線服務,加入額外的執行緒實際上可能會增加客戶端總服務時間。

    解決:通過限制匯流排程數並重復使用執行緒來避免一客戶一執行緒的缺陷。

2)使用執行緒池:與為每個執行緒建立新的執行緒不同,伺服器在啟動時建立固定數量的執行緒組成的執行緒池。當有一個客戶端請求過來時,執行緒池將分配一個執行緒處理,執行緒在處理完請求後將會返回執行緒池,為下一次請求處理做好準備。如果連線請求到達伺服器端,執行緒池中所有的執行緒都已被佔用,它們則在一個佇列中等待,直到有空閒的執行緒可用。

執行緒池服務端具體實現的步驟:

      1.伺服器端建立一個ServerSocket例項。

      2.建立N個執行緒,每個執行緒都反覆迴圈,從(共享的)ServerSocket例項中接收客戶端連線。當多個執行緒同時呼叫同一個ServerSocket例項的accept()方法將會阻塞等待,直到一個新連線建立成功。

      3.新建立連線對應Socket例項則只在選中的執行緒中返回。其他執行緒將阻塞,直到成功建立下一個連線和選中下一個幸運的執行緒。

    缺點:建立的執行緒池太少,客戶端可能等待很長時間才能獲取服務,執行緒池大小不能根據客戶端請求數量進行調整。

    解決:Java中提供一個排程工具(系統管理呼叫介面Executor),可以在系統負載時擴充套件執行緒池的大小,負載較輕時縮減執行緒池的大小。

3)系統管理排程:介面Executor

Executor介面代表了一個根據某種策略來執行Runnable例項的物件,其中包含了排隊和排程的細節,或者如何選擇要執行的任務。Executor介面只定義了一個方法:

public interface Executor {
  void execute(Runnable command);
}

Java中內建了大量的Executor介面實現,很簡單使用,也可以進行擴充套件性配置。其中一些還提供了處理維護執行緒等繁瑣細節的功能。ExecutorService介面繼承於Executor介面,提供了一個更高階的工具來關閉伺服器,包括正常關閉和突然關閉。ExecutorService還允許在完成任務後返回一個結果,這需要用到Callable介面,它和Runnable介面很像,只是多了一個返回值。

阻塞和超時

Socket的呼叫可能存多種原因而阻塞。

1)read(),receive()方法在沒有資料可讀時會阻塞。ServerSocket的accept()方法和Socket構造方法會阻塞等待,直到建立連線。

解決:使用socket,ServerSocket,以及DatagramSocket類的setSoTimeout()方法,來設定其阻塞的最長時間。指定時間內方法沒有返回將會丟擲異常InterruptedIOException。對於Socket例項,可以在read()方法前,在套接字的InputStream上呼叫available()方法檢測是否有可讀資料。

2)連線和寫資料

Socket類的建構函式會嘗試根據引數中指定的主機和埠來建立連線,並阻塞等待,直到連線成功建立或發生了系統定義的超時。不幸的是,系統定義的超時時間很長,而Java又沒有提供任何縮短它的方法。要改變這種情況,可以使用Socket類的無引數建構函式,它返回的是一個沒有建立連線的Socket例項。需要建立連線時,呼叫該例項的connect()方法,並指定一個遠端終端和超時時間。

write()方法呼叫也會阻塞等待,直到最後一個位元組成功寫入到了TCP實現的本地快取中。如果可用的快取空間比要寫入的資料小,在write()方法呼叫返回前,必須把一些資料成功傳輸到連線的另一端。

3)限制客戶端的時間

通過服務期限和當前計算出截止時間,每次呼叫read()結束後,重新計算當前和截止時間的差值,即服務截止時間,並將套接字的超時時間設定為該剩餘時間。

package com.tcp.ip.chapter4;
 
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
 
public class TimeLimitEchoProtocol implements Runnable {
 
	private static final int BUFSIZE = 32;
	private static final String TIMELIMIT = "10000";
	private static final String TIMELIMITPROP = "Timelimit";
	
	private static int timelimit;
	
	private Socket clntSock;
 
	private Logger logger;
	
	public TimeLimitEchoProtocol(Socket clntSock, Logger logger){
		
		this.clntSock = clntSock;
		this.logger = logger;
		timelimit = Integer.parseInt(System.getProperty(TIMELIMITPROP, TIMELIMIT));
	}
	
	public static void handleEchoClient(Socket clntSock, Logger logger){
		try {
			InputStream in = clntSock.getInputStream();
			OutputStream out = clntSock.getOutputStream();
			int recvMsgSize; 
			int totalBytesEchoed = 0;
			byte[] echoBuffer = new byte[BUFSIZE];
			//算出最後結束的時間
			long endTime = System.currentTimeMillis() + timelimit;
			int timeBoundMillis = timelimit;
			clntSock.setSoTimeout(timeBoundMillis);
			
			while ((timeBoundMillis > 0) && 
					((recvMsgSize = in.read(echoBuffer)) != -1)) {
				out.write(echoBuffer, 0, recvMsgSize);
				totalBytesEchoed += recvMsgSize;
				//當前剩餘時間
				timeBoundMillis = (int) (endTime - System.currentTimeMillis());
				clntSock.setSoTimeout(timeBoundMillis);
				logger.info("客戶端 " + clntSock.getRemoteSocketAddress() +
						", echoed " + totalBytesEchoed + " bytes.");
				
			}
		} catch (IOException ex) {
			logger.log(Level.WARNING, "Exception in echo protocol" , ex);
		}
	}
	
	public void run() {
		handleEchoClient(this.clntSock, this.logger);
		
	}
}

TCP多執行緒案例

客戶端的程式碼

public class TCPEchoClient {
  public static void main(String[] args) throws IOException {
    Socket clientSocket = new Socket("127.0.0.1",1234);
    //鍵盤錄入
    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    //傳送到客戶端
    PrintStream out = new PrintStream(clientSocket.getOutputStream());
    //讀取從伺服器端的回覆
    BufferedReader  getBr = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    String input = null;
    while((input = br.readLine())!=null) {
      out.println(input);
      if("end".equals(input)) {
        break;
      }
      String echoInfo = getBr.readLine();
      System.out.println("Information from Server "+clientSocket.getRemoteSocketAddress()+": "+echoInfo);
    }
    clientSocket.close();
  }
}

處理任務的類

public class EchoProtocol implements Runnable{
    
  private Socket clientSocket;
  private Logger logger;
  public EchoProtocol(Socket socket ,Logger logger) {
    this.clientSocket= socket;
    this.logger = logger;
  }
  
  public static void handleEchoClient(Socket clientSocket,Logger logger) {
    try {
      //從客戶端讀取
      BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
      //從客戶端讀取的資訊,加echo+info進行回覆
      PrintStream out = new PrintStream(clientSocket.getOutputStream());
      String info = null;
      while((info =br.readLine())!=null) {
        if("end".equals(info)) {
          break;
        }
        System.out.println("information from client "+clientSocket.getRemoteSocketAddress()+": "+info);
        out.println("echo:"+info);
      }
      clientSocket.close();
    }catch(IOException e) {
    }
  }
  @Override
  public void run() {
    handleEchoClient(clientSocket,logger);
  }
}

1.一客戶一執行緒的方式

public class TCPEchoServerThread {
  public static void main(String[] args) throws IOException {
    ServerSocket serverSocket = new ServerSocket(1234);
    Logger logger = Logger.getLogger("practical");
    while(true) {
      Socket clientSocket = serverSocket.accept();
      Thread thread = new Thread(new EchoProtocol(clientSocket,logger));
      thread.start();
      logger.info("Created and started Thread "+thread.getName());
    }
  }
}

2.執行緒池的方式

public class TCPEchoServerPool {
  private static final int THREADPOOL = 5;
  public static void main(String[] args) throws IOException {
    ServerSocket serverSocket = new ServerSocket(1234);
    Logger logger = Logger.getLogger("practical");
    for(int i = 0; i < THREADPOOL;i++) {
      Thread thread = new Thread() {
        public void run() {
          while(true) {
            try {
              Socket clientSocket = serverSocket.accept();
              EchoProtocol.handleEchoClient(clientSocket, logger);
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        }
      };
      thread.start();
      logger.info("Created and Started thread :"+thread.getName());
    }
  }
}

3.系統管理排程:Executor介面

public class TCPEchoServerExecutor {
  public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(1234);
    ExecutorService executor = Executors.newCachedThreadPool();
    Logger logger = Logger.getLogger("pratical");
    while(true) {
      Socket clientSocket = socket.accept();
      executor.submit(new EchoProtocol(clientSocket,logger));
    }
  }
}