1. 程式人生 > 其它 >Java多執行緒批量處理、執行緒池的使用

Java多執行緒批量處理、執行緒池的使用

1、引言

在開發中,有時會遇到批量處理的業務。如果單執行緒處理,速度會非常慢,可能會導致上游超時。這是就需要使用多執行緒開發。

建立執行緒時,應當使用執行緒池。一方面避免了處理任務時建立銷燬執行緒開銷的代價,另一方面避免了執行緒數量膨脹導致的過分排程問題,保證了對核心的充分利用。

可以使用J.U.C提供的執行緒池:ThreadPoolExecutor類。在Spring框架中,也可以使用ThreadPoolTaskExecutor類。ThreadPoolTaskExecutor其實是對ThreadPoolExecutor的一種封裝。

2、使用ThreadPoolExecutor類

假設現有業務,輸入Input類,輸出Output類:

@Data
@AllArgsConstructor
public class Input {
    int i;
}

@Data
@AllArgsConstructor
public class Output {
    boolean success;
    String s;
}

這裡@Data與@AllArgsConstrutor使用了Lombok工具

處理方法:

public Output singleProcess(Input input) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
        
return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)) }

現在該業務需要批量處理,輸入List<Input>,輸出List<Output>。那麼可以建立一個核心執行緒數為4的執行緒池,每個執行緒把執行結果新增到執行緒安全的List中。這裡List應當使用SynchronizedList而不是CopyOnWriteArrayList,因為這裡寫操作有多次,而讀操作只有一次。並使用CountDownLatch等待所有執行緒執行結束:

public List<Output> multiProcess(List<Input> inputList) {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    CountDownLatch countDownLatch = new CountDownLatch(inputList.size());
    List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

    for (Input input : inputList) {
        executorService.submit(() -> {
            try {
                // 單個處理
                Output output = singleProcess(input);
                outputList.add(ouput);
            } catch (Exception e) {
                // 處理異常
            } finally {
                countDownLatch.countDown();
            }
        })
    }

    // 等待所有執行緒執行完成
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return outputList;
}

但是這樣還是有很大的問題:

  1. 阿里巴巴開發手冊不建議我們使用Executors建立執行緒池,因為Executors.newFixedThreadPool方法沒有限制執行緒佇列的容量,如果input數量過多,可能導致OOM。
  2. multiProcess不適合被多次呼叫,不適合用在大多數業務場景。

3、在Spring框架中使用ThreadPoolTaskExecutor類

為了應對大多數業務場景,配合Spring Boot框架,我們可以使用ThreadPoolTaskExecutor建立執行緒池,並把它注入到ioc容器中,全域性都可以使用。

首先,配置執行緒池引數

@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
    private int corePoolSize;
    private int maxPoolSize;
    private int queueCapacity;
    private int keepAliveSeconds;
}

  

在配置檔案application.yml中

thread-pool:
  core-pool-size: 4
  max-pool-size: 16
  queue-capacity: 80
  keep-alive-seconds: 120

這裡執行緒池各引數的意義可以參考Java執行緒池實現原理及其在美團業務中的實踐

其次,將ThreadPoolTaskExecutor加入至ioc容器中

  

@EnableAsync
@Configuration
public class ThreadPoolConfig {
    private final ThreadPoolProperties threadPoolProperties;

    @Autowired
    public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
        this.threadPoolProperties = threadPoolProperties;
    }

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        executor.setThreadNamePrefix("thread-pool-");
        return executor;
    }
}

這裡@EnableAsync是與@Async配合使用,用於執行非同步任務,後面會給出示例

最後,在業務類中通過自定義SpringUtils類獲取bean或使用@Async,來使用執行緒池。

/**
 * 業務實現類
 */
@Service
@Slf4j
public class Input2OutputServiceImpl implements Input2OutputService {
    /**
     * 單個處理
     * @param input 輸入物件
     * @return 輸出物件
     */
    @Override
    public Output singleProcess(Input input) {
        log.info("Processing...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return new Output(false, null);
        }
        return new Output(true, String.valueOf(2 * input.getI() + 1));
    }

    /**
     * 批量處理
     * @param inputList 輸入物件列表
     * @return 輸出物件列表
     */
    @Override
    public List<Output> multiProcess(List<Input> inputList) {
        ThreadPoolTaskExecutor executor
                = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
        CountDownLatch latch = new CountDownLatch(inputList.size());
        List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

        for (Input input : inputList) {
            executor.execute(() -> {
                try {
                    Output output = singleProcess(input);
                    outputList.add(output);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return outputList;
    }

    /**
     * 非同步處理
     * @param input 輸入物件
     * @return 輸出Future物件
     */
    @Async("threadPoolTaskExecutor")
    @Override
    public Future<Output> asyncProcess(Input input) {
        return new AsyncResult<>(singleProcess(input));
    }
}

以上程式碼的完整程式碼包括測試程式碼在筆者的GitHub專案thread-pool-demo,在專案中用到ThreadPoolTaskExecutor可參考。