moquette改造筆記(三):優化BrokerInterceptor 中的執行緒池
阿新 • • 發佈:2018-12-10
發現問題
在io.moquette.spi.impl.BrokerInterceptor的建構函式中,新建了一個執行緒池,程式碼如下:
private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);這句程式碼雖然建立了一個固定執行緒數量的執行緒池,但是執行緒池的任務佇列並沒有做限制,一旦某個InterceptHandler中的某個方法進行了耗時處理,在高併發的情況下,會很容易導致執行緒池的佇列堆積大量待處理的任務,進而可能造成記憶體溢位。
解決問題
分別新增以下類和介面
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 建立執行緒池的邏輯改為
private BrokerInterceptor(int poolSize, int capacity, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解釋:(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的執行緒池指定任務佇列大小和拒絕策略LogDiscardRejectPolicy(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務log一遍,對於PublishTask(在上一篇中優化publish邏輯時新增的)做特殊處理,會交給實現RejectHandler的InterceptHandler處理,由業務邏輯處理,出現任務太多處理不完被遺棄的任務該如何處理。
修改InterceptHandler的實現,實現RejectHandler介面
在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現,新增對RejectHandler的實現如下
@Slf4j
@Component
public class SafetyInterceptHandler extends AbstractInterceptHandler{
@Override
public String getID() {
return SafetyInterceptHandler.class.getName();
}
@Override
public void onConnect(InterceptConnectMessage msg) {
}
@Override
public void onConnectionLost(InterceptConnectionLostMessage msg) {
}
@Override
public void onPublish(InterceptPublishMessage msg) {
}
@Override
public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
/**MQTT SERVICE 負載過大,處理不過來時,會回撥該方法*/
//例如可以發生郵件通知相關人員
}
}