mina通訊,對於高併發的產生:java.io.IOException: Too many open files(開啟檔案控制代碼過多問題)
起因:由於業務系統有多個定時任務定時訪問銀行端,銀行每天也有大量業務訪問業務系統,都是通過mina通訊,部署在測試環境的系統每過一兩天開啟控制代碼過萬,生產的也是一週左右不重啟業務系統就會爆掉。一開始並不清楚到底是哪方面原因導致控制代碼增長這麼快,因為這是一個老系統,經過多次升級,大量的併發、多執行緒,所以只好做了一個定時任務,每週重啟生產業務系統。
說明:業務系統和銀行之間的通訊是通過c寫的轉換平臺轉發雙方的資訊的,結構:業務系統(請求)——>轉換平臺(轉發)——>銀行端(相應)——>轉換平臺(轉發)——>業務系統收到響應,銀行端訪問業務系統也是這樣的方式。
開始通過命令查程序佔用的控制代碼數,從大到小排序,一行一個程序ID
lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|more 其中第一列是開啟的控制代碼數,第二列是程序ID。
然後通過命令檢視單個程序所有開啟的檔案詳情
lsof -p 程序id
但是這樣檢視感覺太亂了,沒辦法檢視,於是通過命令:將執行結果內容輸出到日誌檔案中檢視
lsof -p 程序id > openfiles.log
發現是因為很多socket連線沒有釋放,這就能定位出大概是業務系統和銀行通訊的問題,分析原因:測試環境有多家銀行,有些銀行端測試環境沒有測試時並不會開啟,而業務系統直連的是轉換平臺,所以業務系統作為客戶端,訪問轉換平臺是通的,而轉換平臺轉發不出去,無響應,雖說轉換平臺設定了超時時間,但是業務端作為客戶端訪問時並沒有設定讀取超時時間,所以會導致客戶端等待因而導致控制代碼快速增長
下面貼出業務系統作為cilen端和service端的程式碼,並標誌出做出修改的部分。
client程式碼:
package com.fortunes.hmfms.network.client; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import org.apache.mina.core.RuntimeIoException; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ReadFuture; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketConnector; import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fortunes.Message; import com.fortunes.hmfms.network.codec.MessageCodecFactory; import com.fortunes.hmfms.network.model.XmlEntity; public class Client extends IoHandlerAdapter{ final Logger logger = LoggerFactory.getLogger("ROOT"); public static final int CONNECT_TIMEOUT = 3000; public static final String RETURN_VALUE = "returnValue"; private InetSocketAddress serverAddress; private SocketConnector connector; private IoSession session; private MessageReceivedCallback messageReceivedCallback; public Client() { connector = new NioSocketConnector(); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MessageCodecFactory())); //connector.setHandler(new Client()); connector.setHandler(this); //設定超時 add by czp 20181207 connector.setConnectTimeoutMillis(CONNECT_TIMEOUT*2); } public boolean isConnected() { return (session != null && session.isConnected()); } public void connect(InetSocketAddress serverAddress){ setServerAddress(serverAddress); connect(); } public void reConnectIfNecessary(){ if(!isConnected()){ logger.info("連線被斷開,重新連線"); connect(); } } public void connect() { ConnectFuture connectFuture = getConnector().connect(getServerAddress()); connectFuture.awaitUninterruptibly(CONNECT_TIMEOUT); //add by czp 20181207 if (connectFuture.isDone()) { if (!connectFuture.isConnected()) { //若在指定時間內沒連線成功,則丟擲異常 logger.info("連線失敗"); getConnector().dispose(); //不關閉的話會執行一段時間後丟擲,too many open files異常,導致無法連線 } } if(connectFuture.isConnected()){ try { session = connectFuture.getSession(); session.getConfig().setUseReadOperation(true); logger.info("成功連線至{},本地地址:{}",session.getRemoteAddress(),session.getLocalAddress()); } catch (RuntimeIoException e) { logger.info("連線失敗",e); } }else{ connectFuture.cancel(); getConnector().dispose(); logger.info("連線失敗"); } } public XmlEntity sendRequest(Message message,MessageReceivedCallback callback){ /*setMessageReceivedCallback(callback); WriteFuture writeFuture = session.write(message); writeFuture.awaitUninterruptibly(); ReadFuture readFuture = session.read(); readFuture.awaitUninterruptibly(); return callback.process(this, session, (Message)readFuture.getMessage()); */ //change by czp 20181207 解決銀行端無響應出現控制代碼快速上漲 Message resp=null; try { setMessageReceivedCallback(callback); WriteFuture writeFuture = session.write(message); writeFuture.awaitUninterruptibly(); ReadFuture readFuture = session.read(); if(readFuture.awaitUninterruptibly(CONNECT_TIMEOUT*2, TimeUnit.MILLISECONDS)){ //Wait until the message is received resp=(Message)readFuture.getMessage(); //return callback.process(this, session, (Message)readFuture.getMessage()); }else{ logger.info("讀取服務端響應超時,服務端:"+readFuture.getSession().getServiceAddress()); if(session != null){ //關閉IoSession,該操作是非同步的,true為立即關閉,false為所有寫操作都flush後關閉 //這裡僅僅是關閉了TCP的連線通道,並未關閉Client端程式 session.getService().dispose(); session.close(false); //客戶端發起連線時,會請求系統分配相關的檔案控制代碼,而在連線失敗時記得釋放資源,否則會造成檔案控制代碼洩露 //當總的檔案控制代碼數超過系統設定值時[ulimit -n],則拋異常"java.io.IOException: Too many open files",導致新連線無法建立,伺服器掛掉 //所以,若不關閉的話,其執行一段時間後可能丟擲too many open files異常,導致無法連線 connector.dispose(); logger.info("讀取服務端響應超時,客戶端自動釋放資源。。。。。。。。。。。"); } } } catch (Exception e) { logger.info("Client.sendRequest出現異常:"+e.getStackTrace()); } return callback.process(this, session, resp); } public void close(){ if(isConnected()){ //關閉IoSession,該操作是非同步的,true為立即關閉,false為所有寫操作都flush後關閉 //這裡僅僅是關閉了TCP的連線通道,並未關閉Client端程式 session.getService().dispose();//add by czp 20181207 session.close(false); connector.dispose(); logger.info("客戶端關閉了連線\n"); } } @Override public void messageReceived(IoSession session, Object message) throws Exception { logger.info("收到來自"+session.getRemoteAddress()+"的訊息:\n{} - 本地埠:{}",message,session.getLocalAddress()); } @Override public void messageSent(IoSession session, Object message) throws Exception { logger.info("傳送至"+session.getRemoteAddress()+"的訊息:\n{}",message); logger.info("訊息已傳送!"); } @Override public void sessionClosed(IoSession session) throws Exception { session.getService().dispose();//add by czp 20181207 session.close(false);//add by czp 20181207 logger.info("連線至{}的連線被關閉!- 本地埠:{}",session.getRemoteAddress(),session.getLocalAddress()); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.close(true); logger.info("通訊出現異常{}",cause); } public void setMessageReceivedCallback(MessageReceivedCallback messageReceivedCallback) { this.messageReceivedCallback = messageReceivedCallback; } public MessageReceivedCallback getMessageReceivedCallback() { return messageReceivedCallback; } public void setConnector(SocketConnector connector) { this.connector = connector; } public SocketConnector getConnector() { return connector; } public void setServerAddress(InetSocketAddress serverAddress) { this.serverAddress = serverAddress; } public InetSocketAddress getServerAddress() { return serverAddress; } }
尤其是在方法sessionClosed中新增的session.getService().dispose();和session.close(false);在session關閉前對控制代碼的釋放。這很重要,如果沒有釋放,即使session關閉,被它開啟的檔案控制代碼會一直持有的。
下面是業務系統作為service服務端的程式碼MessageHandler:(服務端程式碼無修改)
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fortunes.Message;
import com.fortunes.hmfms.network.model.XmlEntity;
public class MessageHandler extends IoHandlerAdapter {
final Logger logger = LoggerFactory.getLogger("ROOT");
@Override
public void messageReceived(IoSession session, Object o){
logger.info("mina socket 通訊,收到來自遠端客戶端:{}的請求,銀行程式碼:{}",session.getRemoteAddress(),bankCode);
Message requestMessage = (Message)o;
XmlEntity responseXml = null;
XmlEntity requestXml = XmlEntity.parse(requestMessage.getContents());
if(requestXml == null){
responseXml = XmlEntity.create().createDefaultRequest(XML_ERROR);
responseXml.setResponseCodeAndMsg("000001", "XML報文格式解釋出錯!請檢查輸入的報文格式");
session.write(Message.createDefaultMessage(responseXml.buildAsBytes()));
}else{
try {
//業務處理程式碼
} catch (NumberFormatException e) {
logger.info("報文介面程式執行異常", e);
responseXml = XmlEntity.create().createDefaultResponse(requestXml);
responseXml.setResponseCodeAndMsg("000001", "系統異常!請檢查輸入資料,"+e.getMessage());
} catch (Exception e) {
logger.info("報文介面程式執行異常", e);
responseXml = XmlEntity.create().createDefaultResponse(requestXml);
responseXml.setResponseCodeAndMsg("000001", "系統異常!請檢查輸入資料,稍後再試");
}
}
session.write(Message.createDefaultMessage(responseXml.buildAsBytes()));
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)throws Exception {
session.close(true);
logger.info("通訊程式執行異常", cause);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
session.close(false);
// logger.info("連線已關閉!", session.getRemoteAddress());
logger.info("連線已關閉!");
}
}
通過上述修改,部署在測試環境測試後,發現再無控制代碼快速增長的情況,控制代碼數穩定在初始部署的條數。