1. 程式人生 > >Thrift連線池,spring配置化,透明化呼叫(優化)

Thrift連線池,spring配置化,透明化呼叫(優化)

Thrift連線池,spring配置化,透明化呼叫(優化2)

2017.0409優化:
連線池優化:公平鎖替換阻塞執行緒佇列,提升連線池效能,服務端介面設定 200毫秒處理時間 ,1 個selectThread ,1 個 workThread 場景下,客戶端 2000執行緒啟動Thrfit 遠端呼叫,具有良好的穩定性。

github專案地址:

由於看了dubbo 的程式碼,就在思考,thrift 也能通過 invoker 與 proxy 的方法將服務透明化,就與普通的bean 一樣的呼叫方式

經過壓力測試,連線池效果還是挺不錯的

瞭解連線池的都知道,每次建立遠端資源連線並進行請求,再斷開連線,對系統資源的消耗是巨大的,所以每次請求thrift的server 並沒有必要關閉io資源,可以建立連線池處理;

thrift 版本號 0.10.0

thrift 檔案:
IThriftInfoTestService.thrift
IThriftTestService.thrift
IThriftInfoTestService.thrift

程式碼:

namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service

service  IThriftInfoTestService {
  string
showInfoData(1:string name,2:bool b2,3:map<string,string> m2) }
IThriftTestService.thrift

程式碼:

namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service

service  IThriftTestService {
  string
showThriftResult(1:string name,2:bool b2,3:map<string,string> m2) }

provider (服務端)

spring配置檔案 spring-thrift.xml


    <!-- 兩個thrift 服務的實現類 -->
    <bean id="thriftInfoTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftInfoTestServiceImpl" ></bean>
    <bean id="thriftTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftTestServiceImpl" ></bean>



    <!-- Processor呼叫過程工廠,建立多服務類的工廠類 實現 FactoryBean 將bean型別設定成TProcessor  -->
    <bean id="thriftProcessorFactory"  class="com.java.core.rpc.thrift.supports.ThriftProcessorFactory" 
        init-method="convertTargetToTProcessor"
        >
        <property name="targets" >
            <list >
                <ref bean="thriftInfoTestServiceTarget"/>
                <ref bean="thriftTestServiceTarget"/>
            </list>
        </property>

    </bean>




    <!-- serverTransport -->
    <bean class="org.apache.thrift.transport.TNonblockingServerSocket" id="transport" >
        <!-- 埠號 -->
           <constructor-arg index="0" value="29999" />
    </bean>



    <bean class="org.apache.thrift.transport.TFramedTransport.Factory" id ="transportFactory"></bean>
    <bean class="org.apache.thrift.protocol.TCompactProtocol.Factory" id ="protocolFactory"></bean>

    <!-- serverArgs  實現 FactoryBean 用與spring引數設定,因為 workerThreads 等引數原始碼中沒有set/get 方法-->
    <bean class="com.java.core.rpc.thrift.provider.TServerArgsFactory" id="serverArgs" >
        <!-- 埠號 -->
        <property name="transport" ref="transport"  />
        <property name="selectorThreads" value="10"  />
        <property name="workerThreads" value="100"  />
        <property name="transportFactory" ref="transportFactory"  />
        <property name="protocolFactory"  ref="protocolFactory" />

        <!-- 呼叫過程 -->
        <property name="processor"  ref="thriftProcessorFactory" />
    </bean>

    <!-- server -->
    <bean class="org.apache.thrift.server.TThreadedSelectorServer" id="threadedSelectorServer" >
        <!-- 埠號 -->
           <constructor-arg index="0" ref="serverArgs" />
    </bean>


    <!--  thrift 的provider的啟動類 -->
    <bean class="com.java.core.rpc.thrift.provider.AppThriftServer" init-method="initThriftServer"  >
        <property name="transport" ref="transport"  />
        <property name="args" ref="serverArgs"  />
        <property name="server"  ref="threadedSelectorServer" />
    </bean>


新增類 TServerArgsFactory.java 用來代理 TThreadedSelectorServer.Args 類的屬性,原因是 TThreadedSelectorServer.Args 類中的屬性很多沒有get/set 方法或者格式不統一,這樣做個人覺得好用了,可以在spring優雅的進行配置了


/**
 * @author zhuangjiesen
 *封裝 TThreadedSelectorServer.Args 類的屬性 使得能在spring 配置
 */
public class TServerArgsFactory implements FactoryBean<Args> {

    private TServerTransport transport;
    private int selectorThreads;
    private int workerThreads;
    private TTransportFactory transportFactory;
    private TProtocolFactory protocolFactory;
    private TProcessor processor;

    @Override
    public Args getObject() throws Exception {

        TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args((TNonblockingServerSocket)transport);
        args.processor(processor);
        args.protocolFactory(protocolFactory);
        args.transportFactory(transportFactory);
        if (selectorThreads > 0) {
            args.selectorThreads = selectorThreads;
        }
        if (workerThreads > 0) {
            args.workerThreads(workerThreads);
        }
        // TODO Auto-generated method stub
        return args;
    }

    @Override
    public Class<?> getObjectType() {
        // TODO Auto-generated method stub
        return TThreadedSelectorServer.Args.class;
    }

    @Override
    public boolean isSingleton() {
        // TODO Auto-generated method stub
        return true;
    }

    public TServerTransport getTransport() {
        return transport;
    }

    public void setTransport(TServerTransport transport) {
        this.transport = transport;
    }

    public int getSelectorThreads() {
        return selectorThreads;
    }

    public void setSelectorThreads(int selectorThreads) {
        this.selectorThreads = selectorThreads;
    }

    public int getWorkerThreads() {
        return workerThreads;
    }

    public void setWorkerThreads(int workerThreads) {
        this.workerThreads = workerThreads;
    }

    public TTransportFactory getTransportFactory() {
        return transportFactory;
    }

    public void setTransportFactory(TTransportFactory transportFactory) {
        this.transportFactory = transportFactory;
    }

    public TProtocolFactory getProtocolFactory() {
        return protocolFactory;
    }

    public void setProtocolFactory(TProtocolFactory protocolFactory) {
        this.protocolFactory = protocolFactory;
    }

    public TProcessor getProcessor() {
        return processor;
    }

    public void setProcessor(TProcessor processor) {
        this.processor = processor;
    }





}

ThriftInfoTestServiceImpl.java

package com.java.core.rpc.thrift.service.impl;



import java.util.Map;

import org.apache.thrift.TException;

import com.alibaba.fastjson.JSONObject;
import com.java.core.rpc.thrift.service.IThriftInfoTestService;

public class ThriftInfoTestServiceImpl implements IThriftInfoTestService.Iface {

    @Override
    public String showInfoData(String name, boolean success, Map<String, String> map) throws TException {
        // TODO Auto-generated method stub

        //模擬時延處理
        ThreadHelper.sleep(200);


        System.out.println(" ThriftInfoTestServiceImpl doing ...showInfoData()... ");
        System.out.println(" map : "+ JSONObject.toJSONString(map));
        System.out.println(" success : "+ success);
        System.out.println(" name : "+ name);
        String result = name +" time : " + System.currentTimeMillis();

        return result;
    }

}

ThreadHelper.java 時延工具類

public class ThreadHelper {


    /**
     * 休眠模擬 rpc執行時間
     * @param timeßß
     */
    public static void sleep(long timeßß){
        try {

            Thread.currentThread().sleep(150);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }

}

ThriftTestServiceImpl.java 程式碼就不貼了。都一樣
ThriftProcessorFactory.java processor工廠類


/**
 * 
 * 實現 FactoryBean 介面
 * getObjectType 返回 TProcessor 使得可以在spring 中直接設定為 TProcessor型別的 屬性
 * @author zhuangjiesen
 *
 */
public class ThriftProcessorFactory implements FactoryBean<TProcessor> {



    private final static String IFACE_NAME="$Iface";
    private final static String PROCESS_NAME="$Processor";

    private List<Object> targets;


    private Map<String, TProcessor> processors;


    private TMultiplexedProcessor multiplexedProcessor;

    public TProcessor getProcessor(){
        return multiplexedProcessor;
    }


    public TMultiplexedProcessor getMultiplexedProcessor() {
        return multiplexedProcessor;
    }


    public void setMultiplexedProcessor(TMultiplexedProcessor multiplexedProcessor) {
        this.multiplexedProcessor = multiplexedProcessor;
    }


    public ThriftProcessorFactory() {
        super();
        // TODO Auto-generated constructor stub
    }


    public Map<String, TProcessor> getProcessors() {
        return processors;
    }


    public void setProcessors(Map<String, TProcessor> processors) {
        this.processors = processors;
    }





    public List<Object> getTargets() {
        return targets;
    }

    public void setTargets(List<Object> targets) {
        this.targets = targets;
    }


    /**
     * 將實現類封裝成TProcessor類的集合
     * 
     */
    public void convertTargetToTProcessor(){
        if (targets.isEmpty()) {
            return ;
        }
        processors = new HashMap<String, TProcessor>();
        try {
            for (Object target : targets ) {
                Class iface= target.getClass().getInterfaces()[0];
                String ifaceName =iface.getName();
                String serviceName = ifaceName.substring(0, ifaceName.lastIndexOf(IFACE_NAME));

                Class processorClazz = Class.forName(serviceName.concat(PROCESS_NAME));
                Object processorObj = processorClazz.getConstructor(iface).newInstance(iface.cast(target));
                if (processorObj instanceof TProcessor) {
                    TProcessor processor = (TProcessor) processorObj;
                    processors.put(serviceName, processor);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        initMultiplexedProcessor();
    }

    /**
     * 初始化多服務呼叫過程 TMultiplexedProcessor 
     * 並且註冊服務
     */
    private void initMultiplexedProcessor(){
        if (processors.isEmpty()) {
            return ;
        }
        multiplexedProcessor = new TMultiplexedProcessor();
        Set<String> serviceNames = processors.keySet();
        for (String serviceName : serviceNames) {
            if (!processors.containsKey(serviceName)) {
                continue;
            }
            multiplexedProcessor.registerProcessor(serviceName, processors.get(serviceName));
        }
    }


    @Override
    public TProcessor getObject() throws Exception {
        // TODO Auto-generated method stub
        return getProcessor();
    }


    @Override
    public Class<?> getObjectType() {
        // TODO Auto-generated method stub
        return TProcessor.class;
    }


    @Override
    public boolean isSingleton() {
        // TODO Auto-generated method stub
        return true;
    }


}


AppThriftServer.java 重點,服務啟動
優化:屬性設定成全域性變數,歸入spring配置



public class AppThriftServer  implements ApplicationContextAware,InitializingBean {


    /**執行緒池**/
    private static ExecutorService executorService;
    // ApplicationContextAware 可以呼叫spring 生命週期獲取上下文
    private static ApplicationContext context;


    private TServerTransport transport;
    private TThreadedSelectorServer.Args args;
    private TServer server;


    public AppThriftServer() {
        super();
        executorService = Executors.newSingleThreadExecutor();
    }


    public void initThriftServer(){
        //初始化設定
        initConfig();   
        executorService.execute(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println(" ThriftServer start ing ....");
                try { 
                    if (!server.isServing()) {
                        server.serve();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (transport != null) {
                        transport.close();
                    }

                }
            }
        });

    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // TODO Auto-generated method stub
        context=applicationContext;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        // TODO Auto-generated method stub


    }



    /**
     * 初始化引數
     */
    private void initConfig(){
    }


    public TServerTransport getTransport() {
        return transport;
    }


    public void setTransport(TServerTransport transport) {
        this.transport = transport;
    }


    public TThreadedSelectorServer.Args getArgs() {
        return args;
    }


    public void setArgs(TThreadedSelectorServer.Args args) {
        this.args = args;
    }


    public TServer getServer() {
        return server;
    }


    public void setServer(TServer server) {
        this.server = server;
    }




}


接著啟動專案,服務端便開始監聽;

接著是 customer 客戶端

呼叫主體

public class App 
{

    private static ApplicationContext applicationContext;

    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        init();
        final TestService testService=applicationContext.getBean(TestService.class);

        // 回撥方法
        ITestListener testListener = new ITestListener() {
            public void doTest() {
                testService.doThriftInfoTest();
            }
        };

        //測試方法
        doThriftTest(1,testListener);
    }


    //併發執行緒,用來測試併發量
    public static void doThriftTest (int threadCounts,final ITestListener testListener){
        final Object waitObj = new Object();

        for (int i=0 ;i< threadCounts ;i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    synchronized (waitObj) {
                        try {
                            waitObj.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }


                    if (testListener != null) {
                        testListener.doTest();
                    }



                }
            }).start();
        }


        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        synchronized (waitObj) {
            waitObj.notifyAll();
        }

    }


    //測試方法回撥
    public interface ITestListener {
        public void doTest();
    }

    public static void init(){

        applicationContext = new FileSystemXmlApplicationContext("/resources/applicationContext.xml"); 

    }
}
現在是直接在spring配置檔案中配置thrift service的bean,呼叫程式碼跟一般的bean一模一樣

TestService.java 程式碼:




    public void doThriftTest(){

        //運用動態代理 使thrift 介面透明化呼叫
        // 與普通的spring bean 一樣呼叫
        IThriftTestService.Iface client = BeanHelper.getContext().getBean(IThriftTestService.Iface.class);
        Map<String, String> map =new HashMap<String, String>();
        map.put("name", "莊傑森");
        map.put("IThriftTestService", "client");
        map.put("content", "thrift 的 rpc 呼叫");
        String name = "zhuangjiesen ...IThriftTestService doing...";
        try {
            client.showThriftResult(name, true, map);
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }



    public void doThriftInfoTest(){

        //運用動態代理 使thrift 介面透明化呼叫
        // 與普通的spring bean 一樣呼叫
        IThriftInfoTestService.Iface client = BeanHelper.getContext().getBean(IThriftInfoTestService.Iface.class);
        Map<String, String> map =new HashMap<String, String>();
        map.put("name", "莊傑森");
        map.put("IThriftInfoTestService", "client");
        map.put("content", "thrift 的 rpc 呼叫");
        String name = "zhuangjiesen ...IThriftInfoTestService doing...";
        try {
            client.showInfoData(name, true, map);
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


thrift中 spring 的配置檔案 :spring-thrift.xml

程式碼:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:mvc="http://www.springframework.org/schema/mvc" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd    
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd  
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
>

<!-- thrift 連線池 -->
    <bean id="thriftConnectionPool" class="com.java.core.rpc.thrift.supports.ThriftConnectionPool">
        <property name="host" value="127.0.0.1"/>
        <property name="port" value="29999"/>
        <property name="maxConnections" value="20"/>
        <property name="minConnections" value="5"/>
        <property name="protocolTypeClass" value="org.apache.thrift.protocol.TCompactProtocol"/>

    </bean>


    <!-- 服務管理類 -->
    <bean id="thriftServiceManager" class="com.java.core.rpc.thrift.supports.ThriftServiceManager" >
        <property name="thriftConnectionPool" ref="thriftConnectionPool"></property>


    </bean>

    <!-- 通過實現 FactoryBean ,在進行動態代理,實現對服務的配置-->
    <bean id="thriftTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
        <property name="thriftServiceManager" ref="thriftServiceManager" ></property>
        <property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftTestService.Iface" ></property>

    </bean>


    <bean id="thriftInfoTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
        <property name="thriftServiceManager" ref="thriftServiceManager" ></property>
        <property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftInfoTestService.Iface" ></property>

    </bean>

</beans>


ThriftSpringFactoryBean.java 可以查詢關於spring容器的 FactoryBean 介面用法 用法對照 org.springframework.aop.framework.ProxyFactoryBean

程式碼


public class ThriftSpringFactoryBean<T> implements FactoryBean<T> {

    private ThriftServiceManager thriftServiceManager;

    private Class serviceIfaceClass;

    public T getObject() throws Exception {
        return (T) thriftServiceManager.getThriftClient(serviceIfaceClass);
    }

    public Class<T> getObjectType() {
        return serviceIfaceClass;
    }

    public boolean isSingleton() {
        return true;
    }


    public ThriftServiceManager getThriftServiceManager() {
        return thriftServiceManager;
    }

    public void setThriftServiceManager(ThriftServiceManager thriftServiceManager) {
        this.thriftServiceManager = thriftServiceManager;
    }

    public Class getServiceIfaceClass() {
        return serviceIfaceClass;
    }

    public void setServiceIfaceClass(Class serviceIfaceClass) {
        this.serviceIfaceClass = serviceIfaceClass;
    }
}

ThriftServiceManager 服務管理類

程式碼:


public class ThriftServiceManager {

    private ThriftConnectionPool thriftConnectionPool;


//    private final ConcurrentHashMap<String,Object> thriftClientCache = new ConcurrentHashMap();


    public <T> T getThriftClient(Class<T> serviceIfaceClass){
        if (!serviceIfaceClass.isInterface()) {
            throw new RuntimeException("型別錯誤");
        }
        T client =null;
        ThriftServiceProxyInvocation proxyInvocation =new ThriftServiceProxyInvocation();
        // 代理介面類
        proxyInvocation.setIfaceClazz(serviceIfaceClass);
        // 設定連線池
        proxyInvocation.setThriftConnectionPool(thriftConnectionPool);
        // 獲取遠端服務代理類
        client = (T) Proxy.newProxyInstance(serviceIfaceClass.getClassLoader(), new Class[]{ serviceIfaceClass }, proxyInvocation);
        return client;
    }


    public ThriftConnectionPool getThriftConnectionPool() {
        return thriftConnectionPool;
    }

    public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
        this.thriftConnectionPool = thriftConnectionPool;
    }
}


ThriftServiceProxyInvocation.java 動態代理類,獲取連線並呼叫遠端方法

程式碼:


public class ThriftServiceProxyInvocation implements InvocationHandler {

    /*thrift 服務類的iface 類*/
    private Class ifaceClazz;
    /* thrift 連線池*/
    private ThriftConnectionPool thriftConnectionPool;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // TODO Auto-generated method stub
        System.out.println(" ThriftServiceProxyInvocation  invoke doing before ....");
        if (ifaceClazz == null) {
            return null;
        }
        Object result = null;
        try {
            String serviceIfaceClassName = ifaceClazz.getName();
            String serviceClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,"");
            String serviceClientClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,ThriftConstant.CLIENT_NAME);
            Class clientClazz = Class.forName(serviceClientClassName);
            // 連線池中選擇 protocol
            TProtocol protocol = thriftConnectionPool.getProtocol(serviceClassName);
            Object clientInstance= clientClazz.getConstructor(TProtocol.class).newInstance(protocol);
            result=method.invoke(clientInstance, args);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            // 回收 protocol
            thriftConnectionPool.recycleProtocol();
        }

        System.out.println(" ThriftServiceProxyInvocation  invoke doing after ....");

        return result;
    }



    public Class getIfaceClazz() {
        return ifaceClazz;
    }


    public void setIfaceClazz(Class ifaceClazz) {
        this.ifaceClazz = ifaceClazz;
    }


    public ThriftConnectionPool getThriftConnectionPool() {
        return thriftConnectionPool;
    }

    public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
        this.thriftConnectionPool = thriftConnectionPool;
    }

}

接下來就是重點 thrift 連線池 ThriftConnectionPool.java

程式碼:

重點方法:getProtocolInternal() 獲取連線,優化:用reentrantLock 公平鎖 代替阻塞執行緒等待佇列,
加入協議可配置屬性


public class ThriftConnectionPool implements InitializingBean {
    //預設協議
    private final Class DEFAULT_PROTOCOL_CLASS = TBinaryProtocol.class;


    private String host;
    private int port;
    private int minConnections = 5;
    private int maxConnections = 10;
    private volatile int connectionsCount = 0;
    private int connectTimeout;
    private int socketTimeout;
    private int timeout;

    private Class protocolTypeClass;



    private int waitQueueSeconds = 10 ;
    private int recycleSeconds=10;

    // TProtocol 連線
    private LinkedBlockingQueue<TProtocol> blockingQueue;
//    private LinkedBlockingQueue<Thread> waitingThreadBlockingQueue ;
    //公平鎖 排隊處理
    private Lock threadLock = new ReentrantLock(true);

    private ThreadLocal<TProtocol> protocolLocal = new ThreadLocal<TProtocol>();


    //回收執行緒
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);;

    public ThriftConnectionPool() {

    }

    //初始化連線池
    public synchronized void initThriftConnectionPool(){
        blockingQueue = new LinkedBlockingQueue<TProtocol>();
        for (int i = 0 ; i < minConnections ; i++) {
            blockingQueue.add(createNewProtocol());
        }
        setDefaultProtocolClass();


        //回收執行緒
        scheduledExecutorService.schedule(new Runnable() {
            public void run() {


                reducePool();


            }
        },recycleSeconds, TimeUnit.SECONDS);

    }

    public void setDefaultProtocolClass(){
        if (protocolTypeClass != null) {
            return ;
        }
        protocolTypeClass = DEFAULT_PROTOCOL_CLASS;
    }


    //建立協議
    public synchronized TProtocol createNewProtocol(){
        TProtocol protocol = null;
        if (connectionsCount < maxConnections) {
            try {
                TSocket socket = new TSocket(host,port);
                socket.setConnectTimeout(connectTimeout);
                socket.setSocketTimeout(socketTimeout);
                socket.setTimeout(timeout);



                TFramedTransport framedTransport = new TFramedTransport(socket);




                setDefaultProtocolClass();
                Constructor protocalConstructor = protocolTypeClass.getConstructor(TTransport.class);
                protocol =(TProtocol) protocalConstructor.newInstance(framedTransport);

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (protocol != null) {
                    connectionsCount ++ ;
                    try {
                        protocol.getTransport().open();
                    } catch (TTransportException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return protocol;
    }

    //從連線池中獲取Protocol
    public TProtocol getProtocolInternal(){
        protocolLocal.remove();
        TProtocol protocol = null;
        protocol = blockingQueue.poll();
        if (protocol == null) {
            protocol = createNewProtocol();
            //大於最大連線數建立 protocol = null
            if (protocol == null) {
                threadLock.lock();
                try {
                    protocol = blockingQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    threadLock.unlock();
                }

            }
//            waitingThreadBlockingQueue

        } else if (protocol != null && (!protocol.getTransport().isOpen())) {
            //取到 protocol 但是已經關閉 重新建立
            protocol = createNewProtocol();
        }
        protocolLocal.set(protocol);
        return protocol;
    }


    public TProtocol getProtocol(String serviceName){
        TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(getProtocolInternal(),serviceName);
        return multiplexedProtocol;
    }

    
            
           

相關推薦

Thrift連線spring配置透明化呼叫優化

Thrift連線池,spring配置化,透明化呼叫(優化2) 2017.0409優化: 連線池優化:公平鎖替換阻塞執行緒佇列,提升連線池效能,服務端介面設定 200毫秒處理時間 ,1 個selectThread ,1 個 workThread 場景下,客

【專案管理和構建】十分鐘教程eclipse配置maven + 建立maven專案

上篇博文中我們介紹了maven下載、安裝和配置(二),這篇博文我們配置一下eclipse,將它和maven結合,並我們建立一個maven的專案。 準備工作 在eclipse配置maven之前需要我們做好準備工作,如下:  1. 安裝jdk  2. 已安裝好 maven,將maven配置成功  3. 下載

iOS webView的高階用法之JS互動js與oc的相互呼叫JavaScriptCore

重要的事情放前面 github地址:https://github.com/horisea/JSCallOCTest   歡迎star 前言:說起JS互動,很多童鞋會黯然色變,感覺很高深的樣子。大部分小夥伴只知道一種,哪一種我也說說吧。    1.在webView中將要

Spring初始過程原始碼分析1

本文主要詳細分析Spring初始化過程的原始碼分析,目的是理解Spring具體是如何工作的。部分內容查閱於網路,有不妥之處望指正。 1、web專案中伺服器一啟動就開始載入web.xml,Spring的啟動是從web.xml中的org.springframewo

基於Druid資料庫連線的資料來源配置資料庫連線密碼加密解密

Druid的資料庫連線池配置。 <!-- 基於Druid資料庫連線池的資料來源配置 --> <bean id="dataSource" class="com.alibaba.drui

spring-data-mono java註解方式mongo連線帶認證配置

import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import or

2.SpringMVC+Spring+Mybatis整合2 配置web.xmlspring-servlet.xmlapplicationContext.xml

web spring-servlet 在 webapp WEB-INF下 applicationContext 在resource資料夾下 <?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:

Spring連線的常用配置

1、連線池概述   資料庫連線是一種關鍵的有限的昂貴的資源,這一點在多使用者的網頁應用程式中體現得尤為突出。對資料庫連線的管理能顯著影響到整個 應用程式的伸縮性和健壯性,影響到程式的效能指標。資料庫連線池正是針對這個問題提出來的。   資料庫連線池負責分配、管理和釋放資料庫連線,它允許應用程式重複使用一個現

【最近面試遇到的一些問題】資料庫連線的優點和原理常用的java開源連線元件

資料庫連線是一種關鍵的有限的昂貴的資源,這一點在多使用者的網頁應用程式中體現得尤為突出。對資料庫連線的管理能顯著影響到整個應用程式的伸縮性和健壯性,影響到程式的效能指標。資料庫連線池正是針對這個問題提出來的。資料庫連線池負責分配、管理和釋放資料庫連線,它允許應用程式重複使用

系統啟動時spring配置檔案解析失敗報”cvc-elt.1: 找不到元素 'beans' 的宣告“異常

轉自: http://blog.163.com/[email protected]/blog/static/86556803201342210243656/ 現象:Tomcat啟動時,spring載入配置檔案applicationContext.xml出錯,丟

Spring配置事務Junit單元測試報錯"Failed to load ApplicationContext"

問題: Junit單元測試程式碼如下: package cn.muke.spring.demo2; import javax.annotation.Resource; import org.junit.Test; import org.junit.runner.RunWith; imp

資料庫連線 Connection Pool 是什麼做什麼

         重新拾起Java來學,就遇到了一本不錯的書《Hibernate 深入淺出》電子工業出版社,相較其他的工具類書,本書中用詞更加生動活潑,從字句之間就可以看出作者的用心與深厚的文字功底,讓人相信,此書值得一看。         說道資料庫,頭腦中一下子就想到了

分別在Tomcatspring初始時註冊監聽器

Tomcat,spring初始化時載入自定義監聽器方法 Tomcat初始化時載入自定義監聽器方法:   1.建立一個監聽器MyListener繼承ServletContextListener //@Component //監聽器是web層的元件,它是tomcat例項化的,

Spring Boot下Druid連線的使用配置分析

引言: 在Spring Boot下預設提供了若干種可用的連線池,Druid來自於阿里系的一個開源連線池,在連線池之外,還提供了非常優秀的監控功能,這裡講解如何與Spring Boot實現整合。 1.  環境描述      Spring Boot 1.4.0.RELEASE

Spring boot 專案分環境 Maven 打包動態配置檔案動態配置專案

Spring boot Maven 專案打包 使用Maven 實現多環境 test dev prod 打包 專案的結構 在下圖中可用看出,我們打包時各個環境需要分開,採用 application-環境.yml 的方式命名 環境配置開始 首先我們需要在app

各版本lettuce sentinel spring整合流程連線、哨兵配置

spring-data-2與其上一個版本1.8是一個分水嶺,2.0用的是io.lettuce:lettuce-core,2.0之前的spring-data用的是biz.paluch.redis:lettuce spring-data-2.0以上版本配置

三大框架開發時spring配置檔案出現org.springframework.transaction.interceptor.TransactionInterceptor.invoke異常

在最近利用三大框架進行專案開發時,spring配置檔案裡出現了一個橘黃色的雙向箭頭,滑鼠放上去,會提示你advised by  org.springframework.transaction.int

spring boot 學習(四)Druid連線的使用配置

spring boot下Druid連線池的使用配置 Druid介紹 Druid是一個JDBC元件,druid 是阿里開源在 github 上面的資料庫連線池,它包括三部分: * DruidDriver 代理Driver,能夠提供基於Filter-

隨web容器啟動Java類spring初始某個類的方法

之前有篇文章講過隨web容器啟動某個Java類的某個方法的實現,具體有兩種方式,參考原文章:http://blog.csdn.net/u010523770/article/details/44677447 但是在我的web專案中遇到了這樣一個問題,我在需要隨web容器的類的方法中加入了一個while(true

NSIS隱藏窗體標題欄自帶的button最大化最小關閉X)

簡單 ini tex call 定義 ongui col 分享 http 這個問題實在八月份逛csdn論壇的時候偶然遇到的,當時比較好奇樓主為啥要隱藏關閉button。就順口問了下,結果樓主已經棄樓。未給出原因,猜著可能是為了做自己定義頁面美化,無法改變按紐外觀之類的,