連線池實現方式
阿新 • • 發佈:2022-03-13
背景:
一次線上問題,發現控制代碼數非常高,經過定位,發現其實是有方法建立會話,沒有關閉導致的。
基於此,在程式碼裡面及時關閉會話,後來想了一下,還是要做一個連線池做管理比較穩妥。
以下是記錄了一個方式
實現方式
package com.file.service.utils; import com.jcraft.jsch.ChannelSftp; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.integration.file.remote.session.Session; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; /** * Sftp會話連線池 * * @author HuangXiaoFeng * @version $id:SftpSessionPool.java, v 1 2022/3/11 09:58:33, HuangXiaoFeng Exp $ */ @Component public class SftpSessionPool { private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory; @Autowired public void setSftpSessionFactory(SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory) { this.sftpSessionFactory = sftpSessionFactory; } private ConcurrentLinkedQueue<Session> sessionPool = new ConcurrentLinkedQueue<>(); // sftp 連線池大小 @Value("${sftp.poolSize:30}") private int poolSize; private AtomicInteger currentSize = new AtomicInteger(0); /** * 初如化session連線池 */ private synchronized void createSession() { Session session = sftpSessionFactory.getSession(); sessionPool.add(session); currentSize.incrementAndGet(); } /** * 獲取一個session */ public synchronized Session getSession() { if (sessionPool.size() > 0) { Session session = sessionPool.poll(); Object proxyInstance = Proxy.newProxyInstance(SftpSessionPool.class.getClassLoader(), session.getClass().getInterfaces(), new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (!"close".equals(method.getName())) { return method.invoke(session, args); } else { sessionPool.add(session); } return null; } }); return (Session) proxyInstance; } else { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } if (sessionPool.size() == 0 && currentSize.intValue() < poolSize) { createSession(); } return getSession(); } } }