1. 程式人生 > >moquette改造筆記(三):優化BrokerInterceptor 中的執行緒池

moquette改造筆記(三):優化BrokerInterceptor 中的執行緒池

發現問題

在io.moquette.spi.impl.BrokerInterceptor的建構函式中,新建了一個執行緒池,程式碼如下:

private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }
        executor = Executors.newFixedThreadPool(poolSize);
    }

executor = Executors.newFixedThreadPool(poolSize);這句程式碼雖然建立了一個固定執行緒數量的執行緒池,但是執行緒池的任務佇列並沒有做限制,一旦某個InterceptHandler中的某個方法進行了耗時處理,在高併發的情況下,會很容易導致執行緒池的佇列堆積大量待處理的任務,進而可能造成記憶體溢位。

解決問題

分別新增以下類和介面

public class ThreadPoolHelper {
    public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) {
        return new ThreadPoolExecutor(
                threadNum,
                threadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(capacity),
                new SimpleThreadFactory(threadFactoryName),
                new LogDiscardRejectPolicy()
        );
    }
}

public class SimpleThreadFactory implements ThreadFactory {
    private static final String NAME_FORMAT = "%s-%s";
    private String threadNamePrefix;

    public SimpleThreadFactory(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis()));
        return thread;
    }
}


public class LogDiscardRejectPolicy implements RejectedExecutionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r);
        if (!(r instanceof PublishTask)) {
            return;
        }

        PublishTask publishTask = (PublishTask) r;
        InterceptHandler interceptHandler = publishTask.getInterceptHandler();
        if (!(interceptHandler instanceof RejectHandler)) {
            return;
        }

        ((RejectHandler)interceptHandler).rejectedExecution(r,executor);
    }
}


public interface RejectHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

BrokerInterceptor 建立執行緒池的邏輯改為

private BrokerInterceptor(int poolSize, int capacity, List<InterceptHandler> handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }

        /** modify by liuhh */
        executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME);
        //executor = Executors.newFixedThreadPool(poolSize);

    }

解釋:(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的執行緒池指定任務佇列大小和拒絕策略LogDiscardRejectPolicy(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務log一遍,對於PublishTask(在上一篇中優化publish邏輯時新增的)做特殊處理,會交給實現RejectHandler的InterceptHandler處理,由業務邏輯處理,出現任務太多處理不完被遺棄的任務該如何處理

修改InterceptHandler的實現,實現RejectHandler介面

moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現,新增對RejectHandler的實現如下

@Slf4j
@Component
public class SafetyInterceptHandler extends AbstractInterceptHandler{

    @Override
    public String getID() {
        return SafetyInterceptHandler.class.getName();
    }

    @Override
    public void onConnect(InterceptConnectMessage msg) {
        
    }

    @Override
    public void onConnectionLost(InterceptConnectionLostMessage msg) {
       
    }


    @Override
    public void onPublish(InterceptPublishMessage msg) {
       
    }


    @Override
    public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
        
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        /**MQTT SERVICE 負載過大,處理不過來時,會回撥該方法*/
        //例如可以發生郵件通知相關人員
    }

}