1. 程式人生 > >一次socket長連線執行導致的效能問題

一次socket長連線執行導致的效能問題

socket長連線篇

客戶端維持心跳導致出現效能問題

客戶端程式碼

實現一個定時傳送心跳包給服務端的執行緒,一個接收服務端返回訊息的執行緒。
package practice;

import client.Client;
import client.KeepAlive;

import java.io.*;
import java.net.Socket;
import java.util.Scanner;

/**
 * Created by sheng on 17/12/22.
 */
public class MyClient {
    private boolean
running =false; interface ObjectAction { Object doAction(Object obj); } class DefaultObjectAction implements ObjectAction { @Override public Object doAction(Object object) { System.out.println(object); return object; } } public
static void main(String[] args) { MyClient client = new MyClient(); client.doStart(); } public void doStart(){ try { if(running)return; Socket socket=new Socket("127.0.0.1",7890); running=true; Thread t1=new Thread(new KeepAliveWatchDog(socket)); t1.start(); Thread t2=new
Thread(new ReceiveThread(socket)); t2.start(); Scanner input=new Scanner(System.in); String command=input.next(); if(command.equals("cancel")){ doStop(); } } catch (IOException e) { e.printStackTrace(); } } public void doStop(){ if(running) running=false; } class KeepAliveWatchDog extends Thread{ Socket socket; long lastReceive=System.currentTimeMillis(); public KeepAliveWatchDog(Socket socket){ this.socket=socket; } @Override public void run() { //執行緒靠running變數保持持續活躍,一旦主執行緒要求關閉,所有子執行緒要主動關閉套接字 //看門狗每3秒心跳一次 while(running){ try { if (System.currentTimeMillis() - lastReceive > 2000) { OutputStream outputStream = socket.getOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream); objectOutputStream.writeObject(new KeepAlive()); System.out.println("send to server..."); objectOutputStream.flush(); lastReceive = System.currentTimeMillis(); } else { Thread.sleep(10); } }catch(IOException e){ }catch(InterruptedException e){ } } if(!running){ close(); } } public void close(){ if(this.socket!=null){ try { this.socket.close(); }catch(IOException ex){ } } System.out.println("KeepAliveWatchDog socket closed"); } } class ReceiveThread extends Thread{ Socket socket; public ReceiveThread(Socket socket){ this.socket=socket; } @Override public void run() { while(running){ try { InputStream inputStream = socket.getInputStream(); ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); if (objectInputStream.available() > 0) { Object object = objectInputStream.readObject(); ObjectAction oa = new DefaultObjectAction(); oa.doAction(object); } else { Thread.sleep(10); } }catch(IOException e){ }catch(InterruptedException e){ }catch(ClassNotFoundException e){ } } if(!running){ close(); } } public void close(){ if(this.socket!=null){ try { this.socket.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("ReceiveThread socket closed"); } } }

服務端程式碼

實現一個守護執行緒進行socket監聽客戶端連線,每個客戶端socket都會建立一個新的處理執行緒處理客戶端的請求,並返回心跳包。

package practice;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by sheng on 17/12/20.
 * 根據訊息的型別,自動匹配處理器
 */
public class MyServer {
    private final int PORT = 7890;
    private boolean running = false;
    private Thread thread1;
    private Map mapping=new HashMap();
    interface ObjectAction {
        Object doAction(Object obj);
    }

    class DefaultObjectAction implements ObjectAction {
        @Override
        public Object doAction(Object object) {
            System.out.println(object);
            return object;
        }
    }

    public static void main(String[] args) {
        MyServer server = new MyServer();
        server.doStart();
    }

    public void doStart() {
        //啟動accept執行緒,進行
        if (running) return;//確保伺服器單執行緒啟動
        running = true;
        thread1 = new Thread(new ConnWatchDogThread());
        thread1.start();
        System.out.println("server initial....");
        Scanner input=new Scanner(System.in);
        String next=input.next();
        if(next.equals("cancel")){
            doStop();
        }
        //啟動socket action執行緒接收處理
    }

    public void doStop() {
        if (running) running = false;
    }

    /**
     * 訊息-處理器模型,避免做冗餘的判斷
     * 通過key獲取value物件處理器的方式比if..else...或設計模式都要簡單便捷.
     * */
    public void addMapping(Class classes, ObjectAction oa){
        this.mapping.put(classes,oa);
    }

    public ObjectAction getAction(Class classes){
        return this.mapping.get(classes);
    }
    /**
     * 接收執行緒
     */
    class ConnWatchDogThread extends Thread {
        ServerSocket socket;

        @Override
        public void run() {
            try {
                socket = new ServerSocket(PORT, 5);
                while (running) {
                    Socket socket1 = socket.accept();//阻塞方法,但是隻會接收一個,需要迴圈接收
                    System.out.println("accepted client:"+socket1.getRemoteSocketAddress());
                    Thread thread1 = new Thread(new SocketActionThread(socket1));
                    thread1.start();
                }
                if(!running)
                close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void close() {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 訊息處理執行緒
     */
    class SocketActionThread extends Thread {
        Socket socket;
        long lastReceiveTime = System.currentTimeMillis();
        public SocketActionThread(Socket socket) {
            this.socket = socket;
        }
        private int delay = 3000;
        private boolean runnable=true;
        @Override
        public void run() {
            //長連線每隔3秒都需要心跳喂狗一次,
            while(running && runnable) {
                try {
                    if (System.currentTimeMillis() - lastReceiveTime > delay && running) {
                        executeOvertime();
                    } else {
                        //執行讀寫socket
                        InputStream inputStream = socket.getInputStream();
                        if (inputStream.available() > 0) {
                            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
                            Object obj = objectInputStream.readObject();
                            lastReceiveTime = System.currentTimeMillis();
                            ObjectAction oa = new DefaultObjectAction();
                            Object out = oa.doAction(obj);
                            if (out != null) {
                                //回寫給客戶端
                                OutputStream outputStream = socket.getOutputStream();
                                ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
                                objectOutputStream.writeObject(out);
                            }

                        } else {
                            Thread.sleep(10);
                        }
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException ex) {
                    ex.printStackTrace();
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
            }
            if(!running){
                close();
            }
        }

        //超時,主動斷開socket
        public void executeOvertime() {
            if(runnable)runnable=false;
            if (this.socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("client over time:"+socket.getRemoteSocketAddress());
        }

        public void close(){
            if(this.socket!=null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
實驗結果

我開啟了一個server,開啟了5個client。client每隔2秒傳送一次心跳,伺服器端收到後會回覆。
我並沒有手動結束,但是日誌提示,客戶端主動斷開了。
2017-12-23 11:36:19 維持連線包 開始
2017-12-23 16:06:04 維持連線包 結束
記憶體使用資訊:
開啟了5個java程序,佔用記憶體分別為598MB,555MB,471MB,447MB,432MB.
手動關閉client端後,只剩下server 程序,佔用48MB。

客戶端哪個地方寫的有bug,導致客戶端記憶體佔用率這麼大,還是長連線的維持本來就佔用記憶體較高。

———-分割線

解決思路
是什麼引起記憶體增加? 監控到引起記憶體爆滿?
java記憶體分析工具 jconsole snip

服務端正常執行
1.從上圖我們可以看到每隔10分鐘,JVM就會將堆記憶體回收一次.

服務端接收多個客戶端請求
2.從上圖可以看到多個客戶端請求響應,服務端記憶體回收的頻率加快了.

客戶端正常執行
3.上圖客戶端正常執行的記憶體佔用

當服務端意外斷開,客戶端執行
4.上圖是當伺服器端意外斷開或者出現網路超時,客戶端出現了OOM.

異常執行的客戶端執行緒執行,Thread-1是傳送執行緒
5.當出現網路超時時,傳送執行緒並沒有正常退出.
通過下面的日誌輸出可以看到,IOException一直在報錯,我嘗試在異常中增加socket關閉操作,然而並沒有用.不是socket沒關閉造成的.
thread-3是接收執行緒

IO異常產生,但是處理不正確
6.真正引起OOM的是執行緒處於不可控狀態,while一直在迴圈異常.
正常處理客戶端異常後
7.在發生exception時,要進行執行緒控制,這裡設定執行緒的區域性變數runnable=false;在異常發生時,讓while迴圈結束,在run方法最後釋放socket資源.

到此完整解決 2018-1-16