深圳scala-meetup-20180902(2)- Future vs Task and ReaderMonad依賴注入
在對上一次3月份的scala-meetup裡我曾分享了關於Future在函式組合中的問題及如何用Monix.Task來替代。具體分析可以查閱這篇博文。在上篇示範裡我們使用了Future來實現某種non-blocking資料庫操作,現在可以用Task替換Future部分:
class KVStore[K,V] { private val kvs = new ConcurrentHashMap[K,V]() def create(k: K, v: V): Task[Unit] = Task.delay(kvs.putIfAbsent(k,v)) def read(k: K): Task[Option[V]] = Task.delay(Option(kvs.get(k))) def update(k: K, v: V): Task[Unit] = Task.delay(kvs.put(k,v)) def delete(k: K): Task[Boolean] = Task.delay(kvs.remove(k) != null) }
Task是一個真正的Monad,我們可以放心的用來實現函式組合:
type FoodName = String type Quantity = Int type FoodStore = KVStore[String,Int] def addFood(food: FoodName, qty: Quantity)(implicit fs: FoodStore): Task[Quantity] = for { current <- fs.read(food) newQty = current.map(cq => cq + qty).getOrElse(qty) _ <- fs.update(food,newQty) } yield newQty def takeFood(food: FoodName, qty: Quantity)(implicit fs: FoodStore): Task[Quantity] = for { current <- fs.read(food) cq = current.getOrElse(0) taken = Math.min(cq,qty) left = cq - taken _ <- if(left > 0) fs.update(food,left) else fs.delete(food) } yield taken def cookSauce(qty: Quantity)(get: (FoodName,Quantity) => Task[Quantity], put: (FoodName,Quantity) => Task[Quantity]): Task[Quantity] = for { tomato <- get("Tomato",qty) vaggies <- get("Veggies",qty) _ <- get("Galic",10) sauceQ = tomato/2 + vaggies * 3 / 2 _ <- put("Sauce",sauceQ) } yield sauceQ def cookPasta(qty: Quantity)(get: (FoodName,Quantity) => Task[Quantity], put: (FoodName,Quantity) => Task[Quantity]): Task[Quantity] = for { pasta <- get("Pasta", qty) sauce <- get("Sauce", qty) _ <- get("Spice", 3) portions = Math.min(pasta, sauce) _ <- put("Meal", portions) } yield portions
跟上次我們使用Future時的方式沒有兩樣。值得研究的是如何獲取Task運算結果,及如何更精確的控制Task運算如取消執行中的Task:
implicit val refridge = new FoodStore val shopping: Task[Unit] = for { _ <- addFood("Tomato",10) _ <- addFood("Veggies",15) _ <- addFood("Garlic", 42) _ <- addFood("Spice", 100) _ <- addFood("Pasta", 6) } yield() val cooking: Task[Quantity] = for { _ <- shopping sauce <- cookSauce(10)(takeFood(_,_),addFood(_,_)) meals <- cookPasta(10)(takeFood(_,_),addFood(_,_)) } yield meals import scala.util._ import monix.execution.Scheduler.Implicits.global val cancellableCooking = Cooking.runOnComplete { result => result match { case Success(meals) => println(s"we have $meals pasta meals for the day.") case Failure(err) => println(s"cooking trouble: ${err.getMessage}") } } global.scheduleOnce(1 second) { println(s"its taking too long, cancelling cooking ...") cancellableCooking.cancel() }
在上面例子裡的addFood,takeFood函式中都有個fs:FoodStore引數。這樣做可以使函式更加通用,可以對用不同方式實施的FoodStore進行操作。這裡FoodStore就是函式的依賴,我們是通過函式引數來傳遞這個依賴的。重新組織一下程式碼使這種關係更明顯:
class Refridge {
def addFood(food: FoodName, qty: Quantity): FoodStore => Task[Quantity] = { foodStore =>
for {
current <- foodStore.read(food)
newQty = current.map(c => c + qty).getOrElse(qty)
_ <- foodStore.update(food, newQty)
} yield newQty
}
def takeFood(food: FoodName, qty: Quantity): FoodStore => Task[Quantity] = { foodStore =>
for {
current <- foodStore.read(food)
cq = current.getOrElse(0)
taken = Math.min(cq, qty)
left = cq - taken
_ <- if (left > 0) foodStore.update(food, left) else foodStore.delete(food)
} yield taken
}
}
現在我們用一個函式型別的結果來代表依賴注入。這樣做的好處是簡化了函式主體,徹底把依賴與函式進行了分割,使用函式時不必考慮依賴。
scala的函式式元件庫cats提供了一個Kleisli型別,reader monad就是從它推匯出來的:
final case class Kleisli[M[_], A, B](run: A => M[B]) { self =>
...
trait KleisliFunctions {
/**Construct a Kleisli from a Function1 */
def kleisli[M[_], A, B](f: A => M[B]): Kleisli[M, A, B] = Kleisli(f)
…
def >=>[C](k: Kleisli[M, B, C])(implicit b: Bind[M]): Kleisli[M, A, C] =
kleisli((a: A) => b.bind(this(a))(k.run))
…
Kleisli的用途就是進行函式的轉換
// (A=>M[B]) >=> (B=>M[C]) >=> (C=>M[D]) = M[D]
實際上Kleisli就是ReaderT:
type ReaderT[F[_], E, A] = Kleisli[F, E, A]
val ReaderT = Kleisli
val reader = ReaderT[F,B,A](A => F[B])
val readerTask = ReaderT[Task,B,A](A => Task[B])
val injection = ReaderT { foodStore => Task.delay { foodStore.takeFood } }
val food = injection.run(db) // run(kvs), run(dbConfig) …
這段程式碼裡我們也針對上面的例子示範了ReaderT的用法。現在我們可以把例子改成下面這樣:
type FoodName = String
type Quantity = Int
type FoodStore = KVStore[String,Int]
class Refridge {
def addFood(food: FoodName, qty: Quantity): ReaderT[Task,FoodStore,Quantity] = ReaderT{ foodStore =>
for {
current <- foodStore.read(food)
newQty = current.map(c => c + qty).getOrElse(qty)
_ <- foodStore.update(food, newQty)
} yield newQty
}
def takeFood(food: FoodName, qty: Quantity): ReaderT[Task,FoodStore,Quantity] = ReaderT{ foodStore =>
for {
current <- foodStore.read(food)
cq = current.getOrElse(0)
taken = Math.min(cq, qty)
left = cq - taken
_ <- if (left > 0) foodStore.update(food, left) else foodStore.delete(food)
} yield taken
}
}
ReaderT[F[_],E,A]就是ReaderT[Task,FoodStore,Quantity]. FoodStore是注入的依賴,ReaderT.run返回Task:
val cooking: ReaderT[Task,FoodStore,Quantity] = for {
_ <- shopping
sauce <- cooker.cookSauce(10)
pasta <- cooker.cookPasta(10)
} yield pasta
import scala.concurrent.duration._
import scala.util._
import monix.execution.Scheduler.Implicits.global
val timedCooking = cooking.run(foodStore).timeoutTo(1 seconds, Task.raiseError( new RuntimeException(
"oh no, take too long to cook ...")))
val cancellableCooking = timedCooking.runOnComplete { result =>
result match {
case Success(meals) => println(s"we have $meals specials for the day.")
case Failure(exception) => println(s"kitchen problem! ${exception.getMessage}")
}
}
global.scheduleOnce(3 seconds) {
println("3 seconds passed,cancelling ...")
cancellableCooking.cancel()
}
我們知道cooking是個ReaderT,用run(foodStore)來注入依賴foodStore。那麼如果我們還有一個kvStore或者jdbcDB,mongoDB可以直接用run(kvStore), run(jdbcDB), run(mongoDB) ... 返回的結果都是Task。