1. 程式人生 > 程式設計 >Scala併發程式設計實戰:Executor執行緒池

Scala併發程式設計實戰:Executor執行緒池

建立執行緒是一個重量級操作,因為需要呼叫作業系統核心的API,所以最好不要頻繁的建立和銷燬執行緒,為了能夠複用建立的執行緒,常用的辦法的就是建立執行緒池。

Executor

java.util.concurren包中提供了若干介面和類來實現執行緒池,最常用的有Executor,ExecutorService,ThreadPoolExecutor。

Executor介面很簡單定義如下:

public interface Executor {
    void execute(Runnable command);
}複製程式碼

這個介面的目的在於將任務與執行機制解耦,使得使用者不需要手動建立執行緒,只要交給Executor就行了。

ExecutorService

ExecutorService介面則擴充套件了Executor介面,增加了若干實用的方法,最常用的兩個方法:

//關閉執行緒池
void shutdown();
//提交Callable任務以獲取返回值
<T> Future<T> submit(Callable<T> task);複製程式碼

AbstractExecutorService抽象類是ExecutorService的實現,實現了若干模板方法。

最重要的類莫過於ThreadPoolExecutor,它是最最常用的ExecutorService實現類,下面重點說說。

ThreadPoolExecutor

ThreadPoolExecutor在構造時可以指定的引數最多有7個,另外還有3個使用一些預設引數的簡化版本。

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)複製程式碼
  • corePoolSize 是保留的核心執行緒數,即使執行緒處於空閒也不會被回收,除非設定了allowCoreThreadTimeOut屬性。
  • maximumPoolSize 最大執行緒數。當workQueue滿了,會給新提交的任務建立新執行緒,這種情況下執行緒數會超過corePoolSize,但整個執行緒池的執行緒數必須有個上限,就是maximumPoolSize了。
  • keepAliveTime 回收執行緒前,允許保留空閒執行緒的時長。
  • workQueue 儲存提交的任務的佇列
  • threadFactory 建立執行緒的工廠類(ThreadFactory這個介面就定義了一個方法Thread newThread(Runnable r);)
  • handler handler用於沒有可用執行緒(執行緒數達到最大值,沒有空閒執行緒)且workQueue佇列滿了的時候。

ThreadPoolExecutor 已經提供了以下 4 種策略。

CallerRunsPolicy:提交任務的執行緒自己去執行該任務。

AbortPolicy:預設的拒絕策略,會 throws RejectedExecutionException。

DiscardPolicy:直接丟棄任務,沒有任何異常丟擲。

DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作佇列的任務丟棄,然後把新任務加入到工作佇列。

ThreadPoolExecutory的建構函式一共有四種,使得使用者可以省略threadFactory和handler中的一個或兩個。

需要注意的情況當maximumPoolSize>corePoolSize時,如果workQueue滿了,新提交的任務會被新執行緒馬上執行,而之前提交的在佇列中等待的佇列則繼續等待。也就是說後提交的任務可能先執行了。當新執行緒執行完新提交的這個任務後,會轉去執行佇列中的資料,這時消費任務佇列的執行緒數可能會大於corePoolSize,消費速度加快了。下面做個實驗。

package io.github.liam8.con

import java.util.concurrent.{ArrayBlockingQueue,Callable,Future,RejectedExecutionException,ThreadPoolExecutor,TimeUnit}

object ExecutorDemo {

  def main(args: Array[String]): Unit = {
    // corePoolSize=1,maximumPoolSize=2,queue capacity=1
    val executor = new ThreadPoolExecutor(
      1,2,10,TimeUnit.SECONDS,new ArrayBlockingQueue[Runnable](1)
    )
    val task1 = new Runnable {
      override def run(): Unit = {
        println("task1 running")
        Thread.sleep(3000)
        println("task1 complete")
      }
    }
    val task2 = new Runnable {
      override def run(): Unit = {
        println("task2 running")
        Thread.sleep(3000)
        println("task2 complete")
      }
    }
    val task3 = new Callable[String] {
      override def call(): String = {
        println("task3 running")
        Thread.sleep(3000)
        println("task3 complete")
        "xxx"
      }
    }
    val task4 = new Runnable {
      override def run(): Unit = {
        println("task4 running")
        Thread.sleep(3000)
        println("task4 complete")
      }
    }
    var task2Result: Future[String] = null
    var taskCount = 1
    try {
      executor.execute(task1)
      println("task1 submitted")
      taskCount += 1
      executor.execute(task2)
      println("task2 submitted")
      taskCount += 1
      task2Result = executor.submit(task3)
      println("task3 submitted")
      taskCount += 1
      executor.execute(task4)
      println("task4 submitted")
    } catch {
      case e: RejectedExecutionException => println(s"task $taskCount be rejected")
    }
    // 起一個執行緒跟蹤執行緒池大小
    val th = new Thread {
      var threadNum = 0

      override def run(): Unit =
        while (true) {
          if (executor.getPoolSize != threadNum) {
            threadNum = executor.getPoolSize
            println("pool size:" + threadNum)
          }
          Thread.sleep(100)
        }
    }
    th.setDaemon(true)
    th.start()
    if (task2Result != null) {
      println(task2Result.get(7,TimeUnit.SECONDS))
    }
    Thread.sleep(5000)
    executor.shutdown()

  }


}

複製程式碼

output

task1 running
task1 submitted
task2 submitted
task3 submitted
task3 running  //task3在task2之前運行了!
task 4 be rejected // 執行緒數達到最大值,任務佇列也滿了,task4被拒絕(預設的handler)
pool size:2
task1 complete
task3 complete
xxx
task2 running // 空閒的執行緒開始消費佇列
task2 complete
pool size:0複製程式碼

Executors

Executors是JUC包中的一個靜態工廠類,其中除了newFixedThreadPool,newSingleThreadExecutor方法,其他方法都不推薦使用,因為其他方法建立的執行緒池使用的是無界佇列,可能會佔用過多記憶體,甚至OOM,所以建議使用有界佇列。

ExecutionContext

Scala另外提供了ExecutionContext和Future來簡化執行緒池的使用,Future可以接受一個ExecutionContext型別的隱式引數,將傳入的函式提交到ExecutionContext的執行緒池中執行。下面舉個栗子,不做深入探討。

package io.github.liam8.con

import java.util.concurrent.Executors

import scala.concurrent.{Await,ExecutionContext,ExecutionContextExecutorService,Future}
import scala.concurrent.duration._

object ExecutionContextDemo {

  def main(args: Array[String]): Unit = {
    val pool = Executors.newFixedThreadPool(2)
    implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)

    val f = Future {
      val t = Thread.currentThread().getName
      println(s"$t: future is coming")
      123
    }

    val re = f.map(r => {
      val t = Thread.currentThread().getName
      println(s"$t: mapping")
      r * r
    })
    re.onSuccess { case x: Int => println(x) }

    Await.result(f,3.seconds)
    ec.shutdown()
  }

}
複製程式碼

output

pool-1-thread-1: future is coming
pool-1-thread-2: mapping
15129複製程式碼

參考文獻

Executor與執行緒池:如何建立正確的執行緒池?

Futures Made Easy with Scala

本文程式碼

Github倉庫

轉載請註明原文地址:liam-blog.ml/2019/09/22/…