Akka Stream之Graph
最近在專案中需要實現圖的一些操作,因此,初步考慮使用Akka Stream的Graph實現。從而學習了下:
一、介紹
我們知道在Akka Stream中有三種簡單的線性資料流操作:Source/Flow/Sink。但是當我們需要使用一些複雜的操作,例如扇入和扇出時,可能就需要使用圖相關的流操作了。因此,我們可以這樣認為,Akka Stream的Graph是一種運算方案,他可能是簡單的線性資料流,也可以由基礎的流圖組合而成的複雜的資料流程。因為Graph只是對資料流運算的簡單描述,所以它是可以重複利用的。
二、依賴
要使用Akka Stream的Graph,我們需要新增下面的依賴:
<dependency>
<groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>2.5.18</version> </dependency>
三、構建Graph
Graph是由簡單的Flow組成的,這些Flow用作圖形中的線性連線以及用作Flow的扇入和扇出點的連線點。Akka Stream目前提供了下面這些連線點:
1、扇出:
(1)Broadcast[T]:
(1輸入,N輸出)給定輸入元件發射到每個輸出
(2)Balance[T]:
(1輸入,N輸出)給定輸入元件發射到其輸出埠之一
(3)UnzipWith[In,A,B,...]
:(1個輸入,N個輸出)採用1個輸入的函式,給定每個輸入的值發出N個輸出元素(其中N <= 20)
(4)UnZip[A,B]:
(1個輸入,2個輸出)將元組流(A,B)拆分為兩個流,一個是型別A,另一個是型別B
2、扇入:
(1)Merge[In]
:(N個輸入,1個輸出)從輸入中隨機選取將它們逐個推入其輸出
(2)MergePreferred[In]
:Merge
但是如果元素在最受歡迎的埠上可用,它會從中選擇,否則從中隨機從其他埠上選
(3)MergePrioritized[In]
:Merge
但是如果元素在所有輸入埠上都可用,它會根據它們的優先順序隨機選擇它們
(4)MergeLatest[In]
:(N個輸入,1個輸出)發出List[In]
,當第i個輸入流發出元素時,發出的列表中的第i個元素被更新
(5)ZipWith[A,B,...,Out]
:(N個輸入,1個輸出),其取N個輸入的函式,給出每個輸入的值,發出1個輸出元素
(6)Zip[A,B]
:(2個輸入,1個輸出)是一個ZipWith
專用於壓縮和解的輸入流A
和B
成元組流(A,B)
(7)Concat[A]
:(2個輸入,1個輸出)連線兩個流(首先消耗一個,然後消耗第二個)
四、例子
現在假設我們需要實現如下圖所示的一個Graph
我們可以用akka-stream提供的GraphDSL來構建Graph。GraphDSL繼承了GraphApply的create方法,GraphDSL.create(...)就是構建Graph的方法,因此,我們可以使用如下程式碼建立上圖所示的Graph:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = Source(1 to 10) val out = Sink.ignore val bcast = builder.add(Broadcast[Int](2)) val merge = builder.add(Merge[Int](2)) val f1, f2, f3, f4 = Flow[Int].map(_ + 10) in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out bcast ~> f4 ~> merge ClosedShape })
注意:在這個裡面我們需要引入import GraphDSL.Implicits._。是為了將~>(讀作邊緣,通過或者到),以及他的相反操作<~引入到程式碼的範圍內。