1. 程式人生 > >akka-stream與actor系統整合以及如何處理隨之而來的背壓問題

akka-stream與actor系統整合以及如何處理隨之而來的背壓問題

    這幾天上海快下了五天的雨☔️☔️☔️☔️,淅淅瀝瀝,鬱郁沉沉。
    一共存在四個api:

  • Source.actorRef,返回actorRef,該actorRef接收到的訊息,將被下游消費者所消費。
  • Sink.actorRef,接收actorRef,做為資料流下游消費節點。
  • Source.actorPublisher,返回actorRef,使用於reactive stream的Publisher。
  • Sink.actorSubscriber,使用於reactive stream的Subscriber。

Source.actorRef

  val stringSourceinFuture=Source.actorRef[String](100
,OverflowStrategy.fail) // 快取最大為100,超出的話,將以失敗告終 val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source資料流中把不是以"haha"開頭的字串過濾掉 val actor=hahaStrSource.to(Sink.foreach(println)).run() actor!"asdsadasd" actor!"hahaasd" actor!Success("ok")// 資料流成功完成並關閉

    “how to create a Source that can receive elements later via a method call?”在akka-http中經常遇見Source[T,N]

的地方就是對檔案上傳和下載的功能的編碼(檔案IO)中,完成file=>Source[ByteString,_]的轉化,或者Source(List(1,2,3,4,5))這種hello-world級別的玩具程式碼中,這些程式碼中在定義Source時,就已經確定流中資料是什麼了。那麼如何先定義流,而後給流傳遞資料呢?答案就是Source.actorRef。鄭重說明:Source.actorRef沒有背壓策略(背壓簡單說就是生產者的生成速率大於消費者處理速率,導致資料積壓)。

Sink.actorRef

class MyActor extends Actor{
  override
def receive: Receive = { case "FIN"=> println("完成了哇!!!") context.stop(self) case str:String => println("msgStr:"+str) } } ...... val actor=system.actorOf(Props[MyActor],"myActor") val sendToActor=Sink.actorRef(actor,onCompleteMessage = "FIN") val hahaStringSource=Source.actorRef[String](100,OverflowStrategy.dropHead).filter(str=>str.startsWith("haha")) val actorReceive=hahaStringSource.to(sendToActor).run() actorReceive!"hahasdsadsa1" actorReceive!"hahasdsadsa2" actorReceive!"hahasdsadsa3" actorReceive!"hahasdsadsa4" actorReceive!Success("ok") //output msgStr:hahasdsadsa1 msgStr:hahasdsadsa2 msgStr:hahasdsadsa3 msgStr:hahasdsadsa4 完成了哇!!!

    Sink作為資料流終端消費節點,常見用法比如Sink.foreach[T](t:T=>Unit)Sink.fold[U,T](z:U)((u:U,t:T)=>U)等等。Sink.actorRef用於指定某個actorRef例項,把本該資料流終端處理的資料全部發送給這個actorRef例項去處理。解釋上述程式,Sink,actorRef需要說明哪一個actorRef來接收訊息,並且在資料流上游完成時,這個actorRef會接收到什麼樣的訊息作為完成的訊號。我們可以看到onCompleteMessage這條訊息並沒有受到str=>str.startsWith("haha")這過濾條件的作用(同樣的,Sink.actorRef沒有處理背壓功能,資料擠壓過多隻能按某些策略捨棄,或者直接失敗)。

背壓處理

以上Source.actorRefSink.actorRef均不支援背壓策略。我們可以藉助Source.actorPublisher或者Sink.actorPublisher在資料流的上游或者下游處理背壓問題,但是需要去繼承ActorPublisher[T]ActorSubscriber實現了處理邏輯。

Source.actorPublisher

在資料流上游處自己手動實現背壓處理邏輯:

case object JobAccepted
case object JobDenied
case class Job(msg:String)
...
class MyPublisherActor extends ActorPublisher[Job]{
  import akka.stream.actor.ActorPublisherMessage._
  val MAXSize=10
  var buf=Vector.empty[Job]
  override def receive: Receive = {
    case job:Job if buf.size==MAXSize =>
      sender()!JobDenied //超出快取 拒絕處理
    case job:Job =>
      sender()!JobAccepted //確認處理該任務
      buf.isEmpty&&totalDemand>0 match {
        case true =>
          onNext(job)
        case false=>
          buf:+=job //先向快取中存放job
          deliverBuf() //當下遊存在需求時,再去從快取中消費job
      }
    case req@Request(n)=>
      deliverBuf()
    case Cancel=>
      context.stop(self)
  }

  def deliverBuf():Unit= totalDemand>0 match {
    case true =>
      totalDemand<=Int.MaxValue match {
        case true =>
          val (use,keep)=buf.splitAt(totalDemand.toInt) //相當於(buf.take(n),buf.drop(n))
          buf=keep
          use.foreach(onNext(_)) //把buf一份兩半,前一半傳送給下游節點消費,後一半保留
        case false=>
          buf.take(Int.MaxValue).foreach(onNext(_))
          buf=buf.drop(Int.MaxValue)
          deliverBuf() //遞迴
      }
    case false=>
  }
}
...
val jobSource=Source.actorPublisher[Job](Props[MyPublisherActor])
val jobSourceActor=jobSource.via(Flow[Job].map(job=>Job(job.msg*2))).to(Sink.foreach(println)).run()
jobSourceActor!Job("ha")
jobSourceActor!Job("he")

    actorPublisher的函式簽名def actorPublisher[T](props: Props): Source[T, ActorRef]。上述程式碼中totalDemand是由下游消費節點確定。onNext(e)方法在ActorPublisher中定義,作用是將資料傳輸給下游節點。當然還有onComplete()onError(ex)函式,也是用於通知下游節點作出相應處理。

Sink.actorSubscriber

case class Reply(id:Int)
...
class Worker extends Actor{
  override def receive: Receive = {
    case (id:Int,job:Job)=>
      println("finish job:"+job)
      sender()!Reply(id)
  }
}
...
class CenterSubscriber extends ActorSubscriber{
  val router={ //路由組
    val routees=Vector.fill(3){ActorRefRoutee(context.actorOf(Props[Worker]))}
    Router(RoundRobinRoutingLogic(),routees)
  }
  var buf=Map.empty[Int,Job]
  override def requestStrategy: RequestStrategy = WatermarkRequestStrategy.apply(100)
  import akka.stream.actor.ActorSubscriberMessage._
  override def receive: Receive = {
    case OnNext(job:Job)=>
      val temp=(Random).nextInt(10000)->job
      buf+=temp //記錄並下發任務
      router.route(temp,self)
    case OnError(ex)=>
      println("上游發生錯誤了::"+ex.getMessage)
    case OnComplete=>
      println("該資料流完成使命..")
    case Reply(id)=>
      buf-=id//當處理完成時,刪去記錄
  }
}
...
val actor=Source.actorPublisher[Job](Props[MyPublisherActor]).to(Sink.actorSubscriber[Job](Props[CenterSubscriber])).run()
actor!Job("job1")
actor!Job("job2")
actor!Job("job3")

    ActorSubscriber可以接收如下幾種訊息型別:OnNext上游來的新訊息、OnComplete上游已經結束資料流、OnError上游發生錯誤以及其他普通型別的訊息。繼承ActorSubscriber的子類都需要覆寫requestStrategy以此來提供請求策略去控制資料流的背壓(圍繞requestDemand展開,何時向上遊請求資料,一次請求多少資料等等問題)。