Java多執行緒批量處理、執行緒池的使用
1、引言
在開發中,有時會遇到批量處理的業務。如果單執行緒處理,速度會非常慢,可能會導致上游超時。這是就需要使用多執行緒開發。
建立執行緒時,應當使用執行緒池。一方面避免了處理任務時建立銷燬執行緒開銷的代價,另一方面避免了執行緒數量膨脹導致的過分排程問題,保證了對核心的充分利用。
可以使用J.U.C提供的執行緒池:ThreadPoolExecutor類。在Spring框架中,也可以使用ThreadPoolTaskExecutor類。ThreadPoolTaskExecutor其實是對ThreadPoolExecutor的一種封裝。
2、使用ThreadPoolExecutor類
假設現有業務,輸入Input類,輸出Output類:
@Data @AllArgsConstructor public class Input { int i; } @Data @AllArgsConstructor public class Output { boolean success; String s; }
這裡@Data與@AllArgsConstrutor使用了Lombok工具
處理方法:
public Output singleProcess(Input input) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace();return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)) }
現在該業務需要批量處理,輸入List<Input>,輸出List<Output>。那麼可以建立一個核心執行緒數為4的執行緒池,每個執行緒把執行結果新增到執行緒安全的List中。這裡List應當使用SynchronizedList而不是CopyOnWriteArrayList,因為這裡寫操作有多次,而讀操作只有一次。並使用CountDownLatch等待所有執行緒執行結束:
public List<Output> multiProcess(List<Input> inputList) { ExecutorService executorService = Executors.newFixedThreadPool(4); CountDownLatch countDownLatch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executorService.submit(() -> { try { // 單個處理 Output output = singleProcess(input); outputList.add(ouput); } catch (Exception e) { // 處理異常 } finally { countDownLatch.countDown(); } }) } // 等待所有執行緒執行完成 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; }
但是這樣還是有很大的問題:
- 阿里巴巴開發手冊不建議我們使用Executors建立執行緒池,因為Executors.newFixedThreadPool方法沒有限制執行緒佇列的容量,如果input數量過多,可能導致OOM。
- multiProcess不適合被多次呼叫,不適合用在大多數業務場景。
3、在Spring框架中使用ThreadPoolTaskExecutor類
為了應對大多數業務場景,配合Spring Boot框架,我們可以使用ThreadPoolTaskExecutor建立執行緒池,並把它注入到ioc容器中,全域性都可以使用。
首先,配置執行緒池引數
@Data @Component @ConfigurationProperties(prefix = "thread-pool") public class ThreadPoolProperties { private int corePoolSize; private int maxPoolSize; private int queueCapacity; private int keepAliveSeconds; }
在配置檔案application.yml中
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
這裡執行緒池各引數的意義可以參考Java執行緒池實現原理及其在美團業務中的實踐
其次,將ThreadPoolTaskExecutor加入至ioc容器中
@EnableAsync @Configuration public class ThreadPoolConfig { private final ThreadPoolProperties threadPoolProperties; @Autowired public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) { this.threadPoolProperties = threadPoolProperties; } @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(threadPoolProperties.getCorePoolSize()); executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); executor.setQueueCapacity(threadPoolProperties.getQueueCapacity()); executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds()); executor.setThreadNamePrefix("thread-pool-"); return executor; } }
這裡@EnableAsync是與@Async配合使用,用於執行非同步任務,後面會給出示例
最後,在業務類中通過自定義SpringUtils類獲取bean或使用@Async,來使用執行緒池。
/** * 業務實現類 */ @Service @Slf4j public class Input2OutputServiceImpl implements Input2OutputService { /** * 單個處理 * @param input 輸入物件 * @return 輸出物件 */ @Override public Output singleProcess(Input input) { log.info("Processing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)); } /** * 批量處理 * @param inputList 輸入物件列表 * @return 輸出物件列表 */ @Override public List<Output> multiProcess(List<Input> inputList) { ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class); CountDownLatch latch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executor.execute(() -> { try { Output output = singleProcess(input); outputList.add(output); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; } /** * 非同步處理 * @param input 輸入物件 * @return 輸出Future物件 */ @Async("threadPoolTaskExecutor") @Override public Future<Output> asyncProcess(Input input) { return new AsyncResult<>(singleProcess(input)); } }
以上程式碼的完整程式碼包括測試程式碼在筆者的GitHub專案thread-pool-demo,在專案中用到ThreadPoolTaskExecutor可參考。