Socket簡易分散式的多執行緒通訊
阿新 • • 發佈:2018-12-17
分散式計算中processor與master需要進行通訊,兩端程式應用了簡單的C/S通訊模型,一個job在處理時,server進行任務分配,多個client執行完當前任務之後,主動向server請求,server根據schedule algorithm,為client返回任務號,並註明是local task(task需要的data block在記憶體中)還是remote task(task需要的data block不在記憶體中)
Server端程式碼
//server端程式 import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; public class Server { private ServerSocket serverSocket; private HashSet<Socket> allSockets; public static List<Integer> Job = new ArrayList<>(); public static HashMap<Integer,List<Integer>> ClientMemory = new HashMap<>(); public static HashMap<Integer,Integer> Local = new HashMap<>(); public Server(){ try{ serverSocket = new ServerSocket(4700); }catch (IOException e){ e.printStackTrace(); } allSockets = new HashSet<Socket>(); } public void startService() throws IOException{ //ServerThread sT; while(true){ Socket s = serverSocket.accept(); System.out.println("已接收到Client的請求..."); allSockets.add(s); ServerThread ss = new ServerThread(s); ss.start(); System.out.println(ss.getId()); } //sT.sendMessageToAllClient("Job is finished"); } private class ServerThread extends Thread{ Socket socket; public ServerThread(Socket socket){ this.socket = socket; } public void run(){ try { BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter pw = new PrintWriter(socket.getOutputStream()); while(true){ String str = br.readLine().toString(); while(!Job.isEmpty()){ if(str == "Apply"){ str = br.readLine().toString(); int taskId = Job.get(Job.size()-1); int cId = Integer.valueOf(str); if(ClientMemory.get(cId).contains(taskId)){ pw.println("local"); pw.flush(); }else{ pw.println("remote"); pw.flush(); pw.print(Local.get(taskId)); pw.flush(); } str = br.readLine().toString(); if(str == "update"){ str = br.readLine().toString(); ClientMemory.get(cId).add(Local.get(taskId)); } } } } } catch (IOException e) { e.printStackTrace(); } } public void sendMessageToAllClient(String message) throws IOException{ for(Socket s: allSockets){ PrintWriter pw =new PrintWriter(s.getOutputStream()); pw.println(message); pw.flush(); } } public void sendMessageToClient(Socket s,String message) throws IOException{ PrintWriter pw =new PrintWriter(s.getOutputStream()); pw.println(message); pw.flush(); } //生成給Client的訊息 //如果任務所需data在該Client本地 通知local //如果data在其他伺服器 告知remote和data位置 public String assignTaskToClient(int taskId){ String result = null; return result; } public void updateClientMemory(int ClientId,int BlockId){} //判斷task需要的data是否在Client記憶體中 public boolean isLocal(int taskId){ return false; } //返回演算法結果中 將分配給Client的任務是哪個 public int result(){ int result = 0; return result; } } public static void main(String arg[]){ Server ser = new Server(); try { ser.startService(); } catch (IOException e) { e.printStackTrace(); } } }
Client端程式碼
//client端程式 import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import static java.lang.System.in; public class Client extends Thread{ private Socket socket; private int ClientId; //那幾塊資料在記憶體 static List<Integer> Memory = new ArrayList<Integer>(); //取遠端資料,同時要呼叫更新Memory的函式 public Client(String host,int port,int clientId)throws UnknownHostException,IOException { socket = new Socket(host, port); ClientId = clientId; } //與主機建立連線 public void connectSer() throws IOException{ new ClientThread(socket).start(); } private class ClientThread extends Thread{ Socket socket; public ClientThread(Socket socket){ this.socket = socket; } public void run(){ try{ String message = null; BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter pw = new PrintWriter(socket.getOutputStream()); while(br.readLine().toString()!="Finish"){ pw.println("Apply"); pw.flush(); pw.println(ClientId); pw.flush(); message = br.readLine().toString(); if(message =="local"){ ProcessTask(3000); }else if(message == "remote"){ message = br.readLine().toString(); getData(Integer.valueOf(message)); pw.print("update"); pw.flush(); pw.print(Integer.valueOf(message)); pw.flush(); ProcessTask(3000); } br.readLine(); } socket.close(); }catch (IOException e){ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public void getData(int serverId){ System.out.println("Get data from No."+ serverId); } synchronized public void updateMemory(int BlockId){ Memory.add(BlockId); } //模擬執行任務需要的時間 public void ProcessTask(long taskCost) throws InterruptedException { ClientThread.sleep(taskCost); } //請求一個任務,需要告知Server遠端task的data已經拉入本地記憶體 synchronized public int ApplyTask(){ String dataMessage = null; int serId = 0; return serId; } public void close(){ try{ socket.close(); }catch (IOException e){ e.printStackTrace(); } } public static void main(String arg[]){ try { Client c1 = new Client("127.0.0.1",4700,1); Client c2 = new Client("127.0.0.1",4700,2); Client c3 = new Client("127.0.0.1",4700,3); Client c4 = new Client("127.0.0.1",4700,4); Client c5 = new Client("127.0.0.1",4700,5); Client c6 = new Client("127.0.0.1",4700,6); c1.connectSer(); c2.connectSer(); c3.connectSer(); c4.connectSer(); c5.connectSer(); c6.connectSer(); } catch (IOException e) { e.printStackTrace(); } } }
Socket的通訊太過麻煩,後期會用其他的框架替換掉