使用Scala實現一個併發庫(NonBlocking版本)
阿新 • • 發佈:2019-02-13
這個例子來源於scala聖經級教程《Functional Programming in Scala》,由於本人跟著書中的程式碼敲了一遍,然後寫了點測試程式碼驗證了一下正確性,所以就放在這做個備忘吧。貼出來只是為了方便自己,如果看不懂,但是又感興趣的就去看原書吧……
> 注:本文是上一篇文章《使用Scala實現一個併發庫(阻塞版本, 下一篇文章提供NonBlocking版本)》的延續
package parallelism
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference
import parallelism.Nonblocking.Future
import language.implicitConversions
object Nonblocking {
trait Future[+A] {
private[parallelism] def apply(k: A => Unit): Unit
}
type Par[+A] = ExecutorService => Future[A]
object Par {
def run[A](es: ExecutorService)(p: Par[A]): A = {
// A mutable, threadsafe reference, to use for storing the result
val ref = new java.util.concurrent.atomic.AtomicReference[A]
// A latch which, when decremented, implies that `ref` has the result
val latch = new CountDownLatch(1)
// Asynchronously set the result, and decrement the latch
p(es) { a => ref.set(a); latch.countDown }
// Block until the `latch.countDown` is invoked asynchronously
latch.await
// Once we've passed the latch, we know `ref` has been set, and return its value
ref.get
}
def unit[A](a: A): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
cb(a)
}
/** A non-strict version of `unit` */
def delay[A](a: => A): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
cb(a)
}
def fork[A](a: => Par[A]): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
eval(es)(a(es)(cb))
}
/**
* Helper function for constructing `Par` values out of calls to non-blocking continuation-passing-style APIs.
* This will come in handy in Chapter 13.
*/
def async[A](f: (A => Unit) => Unit): Par[A] = es => new Future[A] {
def apply(k: A => Unit) = f(k)
}
/**
* Helper function, for evaluating an action
* asynchronously, using the given `ExecutorService`.
*/
def eval(es: ExecutorService)(r: => Unit): Unit =
es.submit(new Callable[Unit] {
def call = r
})
def map2[A, B, C](p: Par[A], p2: Par[B])(f: (A, B) => C): Par[C] =
es => new Future[C] {
def apply(cb: C => Unit): Unit = {
var ar: Option[A] = None
var br: Option[B] = None
// this implementation is a little too liberal in forking of threads -
// it forks a new logical thread for the actor and for stack-safety,
// forks evaluation of the callback `cb`
val combiner = Actor[Either[A, B]](es) {
case Left(a) =>
if (br.isDefined) eval(es)(cb(f(a, br.get)))
else ar = Some(a)
case Right(b) =>
if (ar.isDefined) eval(es)(cb(f(ar.get, b)))
else br = Some(b)
}
p(es)(a => combiner ! Left(a))
p2(es)(b => combiner ! Right(b))
}
}
// specialized version of `map`
def map[A, B](p: Par[A])(f: A => B): Par[B] =
es => new Future[B] {
def apply(cb: B => Unit): Unit =
p(es)(a => eval(es) {
cb(f(a))
})
}
def lazyUnit[A](a: => A): Par[A] =
fork(unit(a))
def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
as match {
case Nil => unit(Nil)
case h :: t => map2(h, fork(sequence(t)))(_ :: _)
}
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l, r) = as.splitAt(as.length / 2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
def sequence[A](as: List[Par[A]]): Par[List[A]] =
map(sequenceBalanced(as.toIndexedSeq))(_.toList)
def parMap[A, B](as: List[A])(f: A => B): Par[List[B]] =
sequence(as.map(asyncF(f)))
def parMap[A, B](as: IndexedSeq[A])(f: A => B): Par[IndexedSeq[B]] =
sequenceBalanced(as.map(asyncF(f)))
// exercise answers
/*
* We can implement `choice` as a new primitive.
*
* `p(es)(result => ...)` for some `ExecutorService`, `es`, and
* some `Par`, `p`, is the idiom for running `p`, and registering
* a callback to be invoked when its result is available. The
* result will be bound to `result` in the function passed to
* `p(es)`.
*
* If you find this code difficult to follow, you may want to
* write down the type of each subexpression and follow the types
* through the implementation. What is the type of `p(es)`? What
* about `t(es)`? What about `t(es)(cb)`?
*/
def choice[A](p: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
p(es) { b =>
if (b) eval(es) {
t(es)(cb)
}
else eval(es) {
f(es)(cb)
}
}
}
/* The code here is very similar. */
def choiceN[A](p: Par[Int])(ps: List[Par[A]]): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
p(es) { ind =>
eval(es) {
ps(ind)(es)(cb)
}
}
}
def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] =
choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse))
def choiceMap[K, V](p: Par[K])(ps: Map[K, Par[V]]): Par[V] =
es => new Future[V] {
def apply(cb: V => Unit): Unit =
p(es)(k => ps(k)(es)(cb))
}
/* `chooser` is usually called `flatMap` or `bind`. */
def chooser[A, B](p: Par[A])(f: A => Par[B]): Par[B] =
flatMap(p)(f)
def flatMap[A, B](p: Par[A])(f: A => Par[B]): Par[B] =
es => new Future[B] {
def apply(cb: B => Unit): Unit =
p(es)(a => f(a)(es)(cb))
}
def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] =
flatMap(p)(b => if (b) t else f)
def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] =
flatMap(p)(i => choices(i))
def join[A](p: Par[Par[A]]): Par[A] =
es => new Future[A] {
def apply(cb: A => Unit): Unit =
p(es)(p2 => eval(es) {
p2(es)(cb)
})
}
def joinViaFlatMap[A](a: Par[Par[A]]): Par[A] =
flatMap(a)(x => x)
def flatMapViaJoin[A, B](p: Par[A])(f: A => Par[B]): Par[B] =
join(map(p)(f))
/* Gives us infix syntax for `Par`. */
implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p)
// infix versions of `map`, `map2` and `flatMap`
class ParOps[A](p: Par[A]) {
def map[B](f: A => B): Par[B] = Par.map(p)(f)
def map2[B, C](b: Par[B])(f: (A, B) => C): Par[C] = Par.map2(p, b)(f)
def flatMap[B](f: A => Par[B]): Par[B] = Par.flatMap(p)(f)
def zip[B](b: Par[B]): Par[(A, B)] = p.map2(b)((_, _))
}
}
def main(args: Array[String]): Unit = {
// _print for dubug purpose
val _print = (flag: Boolean) => if (flag) println(": " + Thread.currentThread()) else println(Thread.currentThread())
val es = Executors.newFixedThreadPool(3)
val intDelayPar = Par.delay[Int]({
_print(false);
math.pow(999, 67).toInt
});
intDelayPar(es).apply((a: Int) => {
_print(true);
println(a)
})
intDelayPar(es)((a: Int) => {
_print(true);
println(a)
})
Par.fork(intDelayPar)(es)((a: Int) => {
_print(true);
println(a)
})
Thread.sleep(6000L)
println("**********************************************")
Par.run(es)(intDelayPar)
Thread.sleep(6000L)
println("**********************************************")
val _intDelayPar = Par.delay[Int]({
val future = es.submit(new Callable[Int] {
_print(true);
override def call = math.pow(999, 67).toInt
})
val res = future.get
println(res)
res
});
Par.run(es)(_intDelayPar)
println("-------------------------------------------")
import scala.language.implicitConversions
import Par.toParOps
// 通過隱式轉換將 map 方法變為中綴操作方式
(Par.delay(1) map {(a: Int) => 111 * a})(es)(println(_))
(Par.delay(2) map {(a: Int) => 111 * a})(es)(println(_))
(Par.delay(3) map {(a: Int) => 111 * a})(es)(println(_))
es.shutdown()
}
}
上述程式碼的執行結果是:
Thread[main,5,main]
: Thread[main,5,main]
2147483647
Thread[main,5,main]
: Thread[main,5,main]
2147483647
Thread[pool-1-thread-1,5,main]
: Thread[pool-1-thread-1,5,main]
2147483647
**********************************************
Thread[main,5,main]
**********************************************
: Thread[main,5,main]
2147483647
-------------------------------------------
111
222
333
上述程式碼用到的Actor原始碼是:
package parallelism
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.{Callable,ExecutorService}
import annotation.tailrec
/*
* Implementation is taken from `scalaz` library, with only minor changes. See:
*
* https://github.com/scalaz/scalaz/blob/scalaz-seven/concurrent/src/main/scala/scalaz/concurrent/Actor.scala
*
* This code is copyright Andriy Plokhotnyuk, Runar Bjarnason, and other contributors,
* and is licensed using 3-clause BSD, see LICENSE file at:
*
* https://github.com/scalaz/scalaz/blob/scalaz-seven/etc/LICENCE
*/
/**
* Processes messages of type `A`, one at a time. Messages are submitted to
* the actor with the method `!`. Processing is typically performed asynchronously,
* this is controlled by the provided `strategy`.
*
* Memory consistency guarantee: when each message is processed by the `handler`, any memory that it
* mutates is guaranteed to be visible by the `handler` when it processes the next message, even if
* the `strategy` runs the invocations of `handler` on separate threads. This is achieved because
* the `Actor` reads a volatile memory location before entering its event loop, and writes to the same
* location before suspending.
*
* Implementation based on non-intrusive MPSC node-based queue, described by Dmitriy Vyukov:
* [[http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue]]
*
* @see scalaz.concurrent.Promise for a use case.
*
* @param handler The message handler
* @param onError Exception handler, called if the message handler throws any `Throwable`.
* @param strategy Execution strategy, for example, a strategy that is backed by an `ExecutorService`
* @tparam A The type of messages accepted by this actor.
*/
final class Actor[A](strategy: Strategy)(handler: A => Unit, onError: Throwable => Unit = throw(_)) {
self =>
private val tail = new AtomicReference(new Node[A]())
private val suspended = new AtomicInteger(1)
private val head = new AtomicReference(tail.get)
/** Alias for `apply` */
def !(a: A) {
val n = new Node(a)
head.getAndSet(n).lazySet(n)
trySchedule()
}
/** Pass the message `a` to the mailbox of this actor */
def apply(a: A) {
this ! a
}
def contramap[B](f: B => A): Actor[B] =
new Actor[B](strategy)((b: B) => (this ! f(b)), onError)
private def trySchedule() {
if (suspended.compareAndSet(1, 0)) schedule()
}
private def schedule() {
strategy(act())
}
private def act() {
val t = tail.get
val n = batchHandle(t, 1024)
if (n ne t) {
n.a = null.asInstanceOf[A]
tail.lazySet(n)
schedule()
} else {
suspended.set(1)
if (n.get ne null) trySchedule()
}
}
@tailrec
private def batchHandle(t: Node[A], i: Int): Node[A] = {
val n = t.get
if (n ne null) {
try {
handler(n.a)
} catch {
case ex: Throwable => onError(ex)
}
if (i > 0) batchHandle(n, i - 1) else n
} else t
}
}
private class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]]
object Actor {
/** Create an `Actor` backed by the given `ExecutorService`. */
def apply[A](es: ExecutorService)(handler: A => Unit, onError: Throwable => Unit = throw(_)): Actor[A] =
new Actor(Strategy.fromExecutorService(es))(handler, onError)
}
/**
* Provides a function for evaluating expressions, possibly asynchronously.
* The `apply` function should typically begin evaluating its argument
* immediately. The returned thunk can be used to block until the resulting `A`
* is available.
*/
trait Strategy {
def apply[A](a: => A): () => A
}
object Strategy {
/**
* We can create a `Strategy` from any `ExecutorService`. It's a little more
* convenient than submitting `Callable` objects directly.
*/
def fromExecutorService(es: ExecutorService): Strategy = new Strategy {
def apply[A](a: => A): () => A = {
val f = es.submit { new Callable[A] { def call = a} }
() => f.get
}
}
/**
* A `Strategy` which begins executing its argument immediately in the calling thread.
*/
def sequential: Strategy = new Strategy {
def apply[A](a: => A): () => A = {
val r = a
() => r
}
}
}
879675643@qq.com lhever
.---.
| | . __.....__ .----. .----. __.....__
| | .'| .-'' '. \ \ / /.-'' '.
| |< | / .-''"'-. `. ' '. /' // .-''"'-. `. .-,.--.
| | | | / /________\ \| |' // /________\ \| .-. |
| | | | .'''-. | || || || || | | |
| | | |/.'''. \\ .-------------''. `' .'\ .-------------'| | | |
| | | / | | \ '-.____...---. \ / \ '-.____...---.| | '-
| | | | | | `. .' \ / `. .' | |
'---' | | | | `''-...... -' '----' `''-...... -' | |
| '. | '. |_|
'---' '---'