1. 程式人生 > >使用WebSocket幫助應用程序群集節點間通信

使用WebSocket幫助應用程序群集節點間通信

body ted 接收 puts n) asi res tin ebs

在一個標準群集場景中,節點通過一個數據包發送到協定好的多播IP地址:Port上,建立起通信。比如使用TCP插頭。

【使用Servlet模擬群集場景】

【1.連接上@ServerEndPoint】

【節點做的事】

//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query
URI uri = new URI("ws", "localhost:8080", path, null, null);

//連接上websocket
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri);

【Server做的事】
 2     public void onOpen(Session session, @PathParam("nodeId") String nodeId)
 3     {
 8         ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster.");11             //通知所有節點 有新的節點加入  因為這是在onOpen發生的,也就是終端連接上的代表加入
12         byte[] bytes = ClusterNodeEndpoint.toByteArray(message);
13 for(Session node : ClusterNodeEndpoint.nodes) 14 //發送ByteBuffer 15 node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));22 ClusterNodeEndpoint.nodes.add(session); 23 }

【2.Servlet負責路由請求和接收消息、Server負責傳遞給其他節點消息】

【節點處理get請求】

 2     protected void
doGet(HttpServletRequest request, HttpServletResponse response) 3 throws ServletException, IOException 4 { 6 //構造Message準備發給節點 7 ClusterMessage message = new ClusterMessage(this.nodeId, 8 "request:{ip:\"" + request.getRemoteAddr() + 9 "\",queryString:\"" + request.getQueryString() + "\"}"); 10 11 //使用序列化機制發送消息 12 try(OutputStream output = this.session.getBasicRemote().getSendStream(); 13 ObjectOutputStream stream = new ObjectOutputStream(output)) 14 { 15 stream.writeObject(message); 16 } 17 response.getWriter().append("OK"); 18 }

【節點接收消息】

 1  @OnMessage
 2     public void onMessage(InputStream input)
 3     {
 4         try(ObjectInputStream stream = new ObjectInputStream(input))
 5         {
 6             ClusterMessage message = (ClusterMessage)stream.readObject();
 7             System.out.println("INFO (Node " + this.nodeId +
 8                     "): Message received from cluster; node = " +
 9                     message.getNodeId() + ", message = " + message.getMessage());
10         }
11         catch(IOException | ClassNotFoundException e)
12         {
13             e.printStackTrace();
14         }
15     }

【Server傳遞給其他節點消息】

 1 @OnMessage
 2     public void onMessage(Session session, byte[] message)
 3     {
 4         try
 5         {
 6             for(Session node : ClusterNodeEndpoint.nodes)
 7             {
 8                 //向其他節點發送消息(消息來自當前節點)
 9                 if(node != session)
11                     node.getBasicRemote().sendBinary(ByteBuffer.wrap(message));
12             }
13         }
14         catch(IOException e)
15         {
16             System.err.println("ERROR: Exception when handling message on server");
17             e.printStackTrace();
18         }
19     }

使用WebSocket幫助應用程序群集節點間通信