一次socket長連線執行導致的效能問題
阿新 • • 發佈:2019-02-12
socket長連線篇
客戶端維持心跳導致出現效能問題
客戶端程式碼
實現一個定時傳送心跳包給服務端的執行緒,一個接收服務端返回訊息的執行緒。
package practice;
import client.Client;
import client.KeepAlive;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;
/**
* Created by sheng on 17/12/22.
*/
public class MyClient {
private boolean running =false;
interface ObjectAction {
Object doAction(Object obj);
}
class DefaultObjectAction implements ObjectAction {
@Override
public Object doAction(Object object) {
System.out.println(object);
return object;
}
}
public static void main(String[] args) {
MyClient client = new MyClient();
client.doStart();
}
public void doStart(){
try {
if(running)return;
Socket socket=new Socket("127.0.0.1",7890);
running=true;
Thread t1=new Thread(new KeepAliveWatchDog(socket));
t1.start();
Thread t2=new Thread(new ReceiveThread(socket));
t2.start();
Scanner input=new Scanner(System.in);
String command=input.next();
if(command.equals("cancel")){
doStop();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void doStop(){
if(running) running=false;
}
class KeepAliveWatchDog extends Thread{
Socket socket;
long lastReceive=System.currentTimeMillis();
public KeepAliveWatchDog(Socket socket){
this.socket=socket;
}
@Override
public void run() {
//執行緒靠running變數保持持續活躍,一旦主執行緒要求關閉,所有子執行緒要主動關閉套接字
//看門狗每3秒心跳一次
while(running){
try {
if (System.currentTimeMillis() - lastReceive > 2000) {
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(new KeepAlive());
System.out.println("send to server...");
objectOutputStream.flush();
lastReceive = System.currentTimeMillis();
} else {
Thread.sleep(10);
}
}catch(IOException e){
}catch(InterruptedException e){
}
}
if(!running){
close();
}
}
public void close(){
if(this.socket!=null){
try {
this.socket.close();
}catch(IOException ex){
}
}
System.out.println("KeepAliveWatchDog socket closed");
}
}
class ReceiveThread extends Thread{
Socket socket;
public ReceiveThread(Socket socket){
this.socket=socket;
}
@Override
public void run() {
while(running){
try {
InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
if (objectInputStream.available() > 0) {
Object object = objectInputStream.readObject();
ObjectAction oa = new DefaultObjectAction();
oa.doAction(object);
} else {
Thread.sleep(10);
}
}catch(IOException e){
}catch(InterruptedException e){
}catch(ClassNotFoundException e){
}
}
if(!running){
close();
}
}
public void close(){
if(this.socket!=null){
try {
this.socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("ReceiveThread socket closed");
}
}
}
服務端程式碼
實現一個守護執行緒進行socket監聽客戶端連線,每個客戶端socket都會建立一個新的處理執行緒處理客戶端的請求,並返回心跳包。
package practice;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* Created by sheng on 17/12/20.
* 根據訊息的型別,自動匹配處理器
*/
public class MyServer {
private final int PORT = 7890;
private boolean running = false;
private Thread thread1;
private Map mapping=new HashMap();
interface ObjectAction {
Object doAction(Object obj);
}
class DefaultObjectAction implements ObjectAction {
@Override
public Object doAction(Object object) {
System.out.println(object);
return object;
}
}
public static void main(String[] args) {
MyServer server = new MyServer();
server.doStart();
}
public void doStart() {
//啟動accept執行緒,進行
if (running) return;//確保伺服器單執行緒啟動
running = true;
thread1 = new Thread(new ConnWatchDogThread());
thread1.start();
System.out.println("server initial....");
Scanner input=new Scanner(System.in);
String next=input.next();
if(next.equals("cancel")){
doStop();
}
//啟動socket action執行緒接收處理
}
public void doStop() {
if (running) running = false;
}
/**
* 訊息-處理器模型,避免做冗餘的判斷
* 通過key獲取value物件處理器的方式比if..else...或設計模式都要簡單便捷.
* */
public void addMapping(Class classes, ObjectAction oa){
this.mapping.put(classes,oa);
}
public ObjectAction getAction(Class classes){
return this.mapping.get(classes);
}
/**
* 接收執行緒
*/
class ConnWatchDogThread extends Thread {
ServerSocket socket;
@Override
public void run() {
try {
socket = new ServerSocket(PORT, 5);
while (running) {
Socket socket1 = socket.accept();//阻塞方法,但是隻會接收一個,需要迴圈接收
System.out.println("accepted client:"+socket1.getRemoteSocketAddress());
Thread thread1 = new Thread(new SocketActionThread(socket1));
thread1.start();
}
if(!running)
close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void close() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 訊息處理執行緒
*/
class SocketActionThread extends Thread {
Socket socket;
long lastReceiveTime = System.currentTimeMillis();
public SocketActionThread(Socket socket) {
this.socket = socket;
}
private int delay = 3000;
private boolean runnable=true;
@Override
public void run() {
//長連線每隔3秒都需要心跳喂狗一次,
while(running && runnable) {
try {
if (System.currentTimeMillis() - lastReceiveTime > delay && running) {
executeOvertime();
} else {
//執行讀寫socket
InputStream inputStream = socket.getInputStream();
if (inputStream.available() > 0) {
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
Object obj = objectInputStream.readObject();
lastReceiveTime = System.currentTimeMillis();
ObjectAction oa = new DefaultObjectAction();
Object out = oa.doAction(obj);
if (out != null) {
//回寫給客戶端
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(out);
}
} else {
Thread.sleep(10);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
if(!running){
close();
}
}
//超時,主動斷開socket
public void executeOvertime() {
if(runnable)runnable=false;
if (this.socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("client over time:"+socket.getRemoteSocketAddress());
}
public void close(){
if(this.socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
實驗結果
我開啟了一個server,開啟了5個client。client每隔2秒傳送一次心跳,伺服器端收到後會回覆。
我並沒有手動結束,但是日誌提示,客戶端主動斷開了。
2017-12-23 11:36:19 維持連線包 開始
2017-12-23 16:06:04 維持連線包 結束
記憶體使用資訊:
開啟了5個java程序,佔用記憶體分別為598MB,555MB,471MB,447MB,432MB.
手動關閉client端後,只剩下server 程序,佔用48MB。
客戶端哪個地方寫的有bug,導致客戶端記憶體佔用率這麼大,還是長連線的維持本來就佔用記憶體較高。
———-分割線
解決思路
是什麼引起記憶體增加? 監控到引起記憶體爆滿?
java記憶體分析工具 jconsole snip
1.從上圖我們可以看到每隔10分鐘,JVM就會將堆記憶體回收一次.
2.從上圖可以看到多個客戶端請求響應,服務端記憶體回收的頻率加快了.
3.上圖客戶端正常執行的記憶體佔用
4.上圖是當伺服器端意外斷開或者出現網路超時,客戶端出現了OOM.
5.當出現網路超時時,傳送執行緒並沒有正常退出.
通過下面的日誌輸出可以看到,IOException一直在報錯,我嘗試在異常中增加socket關閉操作,然而並沒有用.不是socket沒關閉造成的.
6.真正引起OOM的是執行緒處於不可控狀態,while一直在迴圈異常.
7.在發生exception時,要進行執行緒控制,這裡設定執行緒的區域性變數runnable=false;在異常發生時,讓while迴圈結束,在run方法最後釋放socket資源.
到此完整解決 2018-1-16