1. 程式人生 > 其它 >連線池實現方式

連線池實現方式

背景:

一次線上問題,發現控制代碼數非常高,經過定位,發現其實是有方法建立會話,沒有關閉導致的。

基於此,在程式碼裡面及時關閉會話,後來想了一下,還是要做一個連線池做管理比較穩妥。

以下是記錄了一個方式

 

實現方式

package com.file.service.utils;

import com.jcraft.jsch.ChannelSftp;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * Sftp會話連線池
 *
 * @author HuangXiaoFeng
 * @version $id:SftpSessionPool.java, v 1 2022/3/11 09:58:33, HuangXiaoFeng Exp $
 */

@Component
public class SftpSessionPool {

    private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    @Autowired
    public void setSftpSessionFactory(SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory) {
        this.sftpSessionFactory = sftpSessionFactory;
    }

    private ConcurrentLinkedQueue<Session> sessionPool = new ConcurrentLinkedQueue<>();

    // sftp 連線池大小
    @Value("${sftp.poolSize:30}")
    private int poolSize;

    private AtomicInteger currentSize = new AtomicInteger(0);

    /**
     * 初如化session連線池
     */
    private synchronized void createSession() {
        Session session = sftpSessionFactory.getSession();
        sessionPool.add(session);
        currentSize.incrementAndGet();
    }

    /**
     * 獲取一個session
     */
    public synchronized Session getSession() {
        if (sessionPool.size() > 0) {
            Session session = sessionPool.poll();
            Object proxyInstance = Proxy.newProxyInstance(SftpSessionPool.class.getClassLoader(), session.getClass().getInterfaces(), new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    if (!"close".equals(method.getName())) {
                        return method.invoke(session, args);
                    } else {
                        sessionPool.add(session);
                    }
                    return null;
                }
            });
            return (Session) proxyInstance;
        } else {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (sessionPool.size() == 0 && currentSize.intValue() < poolSize) {
                createSession();
            }
            return getSession();
        }
    }
}