使用Scala實現一個併發庫(阻塞版本, 下一篇文章提供NonBlocking版本)
阿新 • • 發佈:2019-02-11
這個例子來源於scala聖經級教程《Functional Programming in Scala》,由於本人跟著書中的程式碼敲了一遍,然後寫了點測試程式碼驗證了一下正確性,所以就放在這做個備忘吧。貼出來只是為了方便自己,如果看不懂,但是又感興趣的就去看原書吧……
> 注:這個併發庫使用的執行緒池如果只有唯一一條工作執行緒的話,會導致執行緒阻塞,可以參考main方法中的示例,阻塞原因與程式碼中fork的實現細節有關,所以,本人會在下一篇文章提供一個非阻塞的版本…..
package parallelism
import java.util.concurrent._
object Par {
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
/**
* `unit` is represented as a function that returns a `UnitFuture`,
* which is a simple implementation of `Future` that just wraps a constant value.
* It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled.
* Its `get` method simply returns the value that we gave it.
*/
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
private case class UnitFuture[A](get: A) extends Future[A] {
override def cancel(mayInterruptIfRunning: Boolean) = false
override def isCancelled = false
override def isDone = true
override def get(timeout: Long, unit: TimeUnit) = get
}
/**
* `map2` doesn't evaluate the call to `f` in a separate logical thread,
* in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism.
* We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
*/
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] = (es: ExecutorService) => {
val af = a(es)
val bf = b(es)
// This implementation of `map2` does _not_ respect timeouts.
// It simply passes the `ExecutorService` on to both `Par` values,
// waits for the results of the Futures `af` and `bf`,
// applies `f` to them, and wraps them in a `UnitFuture`.
// In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`,
// then subtracts that time from the available time allocated for evaluating `bf`.
UnitFuture(f(af.get, bf.get))
}
/**
* This is the simplest and most natural implementation of `fork`,
* but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete.
* Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`,
* this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice.
* This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
*/
def fork[A](a: => Par[A]): Par[A] = es => es.submit(
new Callable[A] {
override def call() = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
def asyncF[A, B](f: A => B): A => Par[B] = a => lazyUnit(f(a))
def map[A, B](pa: Par[A])(f: A => B): Par[B] = map2(pa, unit(()))((a, _) => f(a))
def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] = l.foldRight[Par[List[A]]](unit(List[A]()))((h, t) => map2(h, t)(_ :: _))
/**
* This implementation forks the recursive step off to a new logical thread,
* making it effectively tail-recursive. However, we are constructing
* a right-nested parallel program, and we can get better performance by
* dividing the list in half, and running both halves in parallel.
* See `sequenceBalanced` below.*/
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = as match {
case Nil => unit(Nil)
case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
}
/**
* We define `sequenceBalanced` using `IndexedSeq`, which provides an
* efficient function for splitting the sequence in half.*/
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l, r) = as.splitAt(as.length / 2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
def sequence[A](as: List[Par[A]]): Par[List[A]] = map(sequenceBalanced(as.toIndexedSeq))(_.toList)
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
val pars: List[Par[List[A]]] = l map (asyncF((a: A) => if (f(a)) List(a) else List()))
map(sequence(pars))(_.flatten)
}
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean = p(e).get == p2(e).get
def delay[A](fa: => Par[A]): Par[A] = es => fa(es)
// Notice we are blocking on the result of `cond`.
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = es => if (run(es)(cond).get) t(es) else f(es)
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] =
es => {
// Full source files
val ind = run(es)(n).get
run(es)(choices(ind))
}
def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A])(ifFalse: Par[A]): Par[A] = choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse))
def choiceMap[K, V](key: Par[K])(choices: Map[K, Par[V]]): Par[V] =
es => {
val kVal = run(es)(key).get
run(es)(choices(kVal))
}
def chooser[A, B](p: Par[A])(choices: A => Par[B]): Par[B] =
es => {
val k = run(es)(p).get
run(es)(choices(k))
}
/** `chooser` is usually called `flatMap` or `bind`.
* and you will find, flatMap is a higher abstract for chooser/choiceMap/choice/choiceN
*/
def flatMap[A, B](p: Par[A])(choices: A => Par[B]): Par[B] =
es => {
val k = run(es)(p).get
run(es)(choices(k))
}
def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] = flatMap(p)(b => if (b) f else t)
def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] = flatMap(p)(i => choices(i))
def joinViaFlatMap[A](pp: Par[Par[A]]): Par[A] = flatMap(pp)(p => p)
def join[A](a: Par[Par[A]]): Par[A] = es => run(es)(run(es)(a).get())
def flatMapViaJoin[A, B](p: Par[A])(f: A => Par[B]): Par[B] = join(map(p)(f))
/* Gives us infix syntax for `Par`. */
implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p)
class ParOps[A](p: Par[A]) {}
}
object Examples {
import Par._
def sum(ints: IndexedSeq[Int]): Int =
if (ints.size <= 1) ints.headOption getOrElse 0 else {
val (l, r) = ints.splitAt(ints.length / 2)
sum(l) + sum(r)
}
def main(args: Array[String]): Unit = {
println(sum(Range(0, 10)))
val es = Executors.newFixedThreadPool(2)
val parInt: Par[Int] = es => es.submit(new Callable[Int] {
override def call() = ((System.currentTimeMillis + 1.0) / System.currentTimeMillis) toInt
})
val parString: Par[String] = es => es.submit(new Callable[String] {
override def call() = " |<*>| " * 3
})
val parIntMap = Par.map(parInt)((a: Int) => a * 8)
val parMap2 = Par.map2[Int, String, (Int, String)](parInt, parString)((_ -> _))
println(run(es)(parInt).get)
println(run(es)(parIntMap).get)
println(run(es)(parMap2).get)
// will lead to deadLock because the number of thread is 1 in thread pool `es`
println(run(es)(fork(parMap2)).get)
es shutdown
val singleThreadEs = Executors.newFixedThreadPool(1) // fork Par[A] will lead to block
// val singleThreadEs = Executors.newFixedThreadPool(2) // fork Par[A] will not lead to block
// singleThreadEs如果只有一條執行緒,則會發生阻賽, 不能println結果
//如果singleThreadEs只有唯一一條工作執行緒, 那麼控制檯看不到結果,
println(run(singleThreadEs)(fork(parMap2)).get)
singleThreadEs shutdown //如果上一行程式碼阻賽,shutdown 方法永遠不會被呼叫
}
}
上述程式碼的執行結果是:
45
1
8
(1, |<*>| |<*>| |<*>| )
(1, |<*>| |<*>| |<*>| )
879675643@qq.com lhever
.---.
| | . __.....__ .----. .----. __.....__
| | .'| .-'' '. \ \ / /.-'' '.
| |< | / .-''"'-. `. ' '. /' // .-''"'-. `. .-,.--.
| | | | / /________\ \| |' // /________\ \| .-. |
| | | | .'''-. | || || || || | | |
| | | |/.'''. \\ .-------------''. `' .'\ .-------------'| | | |
| | | / | | \ '-.____...---. \ / \ '-.____...---.| | '-
| | | | | | `. .' \ / `. .' | |
'---' | | | | `''-...... -' '----' `''-...... -' | |
| '. | '. |_|
'---' '---'