akka stream第五課-動態流處理
Dynamic stream handling
動態流處理
Dependency
To use Akka Streams, add the module to your project:
-
val AkkaVersion = "2.6.9" libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion
Introduction
Controlling stream completion with KillSwitch
使用KillSwitch控制流完成
AKillSwitch
allows the completion of operators ofFlowShape
FlowShape
needing completion control. TheKillSwitch
traitallows to:
KillSwitch允許從外部完成FlowShape的操作。它由一個流元素組成,該元素可以連結到需要完成控制的FlowShape操作符。KillSwitch特性允許:
- complete the stream(s) via
shutdown()
通過shutdown()完成流
- fail the stream(s) via
abort(Throwable error)
通過中止使流失敗(可丟擲錯誤)
-
trait KillSwitch { /** * After calling [[KillSwitch#shutdown]] the linked [[Graph]]s of [[FlowShape]] are completed normally. */ def shutdown(): Unit /** * After calling [[KillSwitch#abort]] the linked [[Graph]]s of [[FlowShape]] are failed. */ def abort(ex: Throwable): Unit }
After the first call to eithershutdown
orabort
, all subsequent calls to any of these methods will be ignored. Stream completion is performed by both
在第一次呼叫shutdown或abort之後,所有對這些方法的後續呼叫都將被忽略。流完成由兩者執行
- cancelling its upstream.
取消它的上游。
- completing (in case of
shutdown
) or failing (in case ofabort
) its downstream
完成(關閉時)或失敗(中止時)其下游
AKillSwitch
can control the completion of one or multiple streams, and therefore comes in two different flavours.
KillSwitch可以控制一個或多個流的完成,因此有兩種不同的特點。
UniqueKillSwitch
UniqueKillSwitch
allows to control the completion ofonematerializedGraph
ofFlowShape
. Refer to the below for usage examples.
UniqueKillSwitch允許控制FlowShape的一個物化圖形的完成。請參閱下面的用法示例。
- Shutdown
- 關閉
-
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val (killSwitch, last) = countingSrc .viaMat(KillSwitches.single)(Keep.right) .toMat(lastSnk)(Keep.both) .run() doSomethingElse() killSwitch.shutdown() Await.result(last, 1.second) shouldBe 2
- Abort
- 中止(與about關於不同)
-
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val (killSwitch, last) = countingSrc .viaMat(KillSwitches.single)(Keep.right) .toMat(lastSnk)(Keep.both).run() val error = new RuntimeException("boom!") killSwitch.abort(error) Await.result(last.failed, 1.second) shouldBe error
SharedKillSwitch
ASharedKillSwitch
allows to control the completion of an arbitrary number operators ofFlowShape
. It can be materialized multiple times via itsflow
method, and all materialized operators linked to it are controlled by the switch. Refer to the below for usage examples.
SharedKillSwitch允許控制FlowShape任意數量運算子的完成。它可以通過其流方法實現多次物化,所有與之相關的物化操作符都由開關控制。請參閱下面的用法示例。
- Shutdown
- 關閉
-
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val sharedKillSwitch = KillSwitches.shared("my-kill-switch") val last = countingSrc .via(sharedKillSwitch.flow) .runWith(lastSnk) val delayedLast = countingSrc .delay(1.second, DelayOverflowStrategy.backpressure) .via(sharedKillSwitch.flow) .runWith(lastSnk) doSomethingElse() sharedKillSwitch.shutdown() Await.result(last, 1.second) shouldBe 2 Await.result(delayedLast, 1.second) shouldBe 1
- Abort
- 中止
-
val countingSrc = Source(Stream.from(1)).delay(1.second) val lastSnk = Sink.last[Int] val sharedKillSwitch = KillSwitches.shared("my-kill-switch") val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) val error = new RuntimeException("boom!") sharedKillSwitch.abort(error) Await.result(last1.failed, 1.second) shouldBe error Await.result(last2.failed, 1.second) shouldBe error
AUniqueKillSwitch
is always a result of a materialization, whilstSharedKillSwitch
needs to be constructed before any materialization takes place.
注意
UniqueKillSwitch總是物化的結果,而SharedKillSwitch需要在任何具體化發生之前構造。
Dynamic fan-in and fan-out with MergeHub, BroadcastHub and PartitionHub
使用MergeHub、BroadcastHub和PartitionHub動態扇入和扇出
There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs should be used. They provide means to constructSink
andSource
pairs that are “attached” to each other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out.
在許多情況下,某個服務的消費者或生產者(表示為接收器、源或可能的流)是動態的,並且事先不知道。圖DSL不允許表示這一點,圖的所有連線必須事先知道,並且必須預先連線。為了允許動態扇入和扇出流,應該使用集線器。它們提供了構造相互“連線”的Sink和Source對的方法,但其中一個可以多次具體化以實現動態扇入或扇出。
Using the MergeHub
AMergeHub
allows to implement a dynamic fan-in junction point in a graph where elements coming from different producers are emitted in a First-Comes-First-Served fashion. If the consumer cannot keep up thenallof the producers are backpressured. The hub itself comes as aSource
to which the single consumer can be attached. It is not possible to attach any producers until thisSource
has been materialized (started). This is ensured by the fact that we only get the correspondingSink
as a materialized value. Usage might look like this:
- MergeHub允許在圖中實現一個動態扇入連線點,其中來自不同生產者的元素以先到先得的方式發出。如果消費者跟不上,那麼所有的生產商都會揹負壓力。集線器本身就是一個可以連線單個消費者的源。在這個源被具體化(啟動)之前,不可能附加任何生產者。這是由這樣一個事實所保證的,即我們只獲得對應的Sink作為一個物化值。用法可能如下所示:
-
// A simple consumer that will print to the console for now val consumer = Sink.foreach(println) // Attach a MergeHub Source to the consumer. This will materialize to a // corresponding Sink. val runnableGraph: RunnableGraph[Sink[String, NotUsed]] = MergeHub.source[String](perProducerBufferSize = 16).to(consumer) // By running/materializing the consumer we get back a Sink, and hence // now have access to feed elements into it. This Sink can be materialized // any number of times, and every element that enters the Sink will // be consumed by our consumer. val toConsumer: Sink[String, NotUsed] = runnableGraph.run() // Feeding two independent sources into the hub. Source.single("Hello!").runWith(toConsumer) Source.single("Hub!").runWith(toConsumer)
This sequence, while might look odd at first, ensures proper startup order. Once we get theSink
, we can use it as many times as wanted. Everything that is fed to it will be delivered to the consumer we attached previously until it cancels.
這個序列,雖然一開始看起來很奇怪,但可以確保正確的啟動順序。一旦我們得到水槽,我們就可以隨心所欲地使用它。所有供給它的東西都將被送到我們之前附加的消費者,直到它取消。
Using the BroadcastHub
使用BroadcastHub
ABroadcastHub
can be used to consume elements from a common producer by a dynamic set of consumers. The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is aSink
to which the single producer must be attached first. Consumers can only be attached once theSink
has been materialized (i.e. the producer has been started). One example of using theBroadcastHub
:
- BroadcastHub可用於由一組動態的使用者使用來自公共生產者的元素。生產者的稅率將自動適應最慢的消費者。在這種情況下,集線器是一個接收器,必須首先連線單個生產者。只有當接收器被具體化(即生產者已經啟動)時,才能附加消費者。使用BroadcastHub的一個示例:
-
// A simple producer that publishes a new "message" every second val producer = Source.tick(1.second, 1.second, "New message") // Attach a BroadcastHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
The resultingSource
can be materialized any number of times, each materialization effectively attaching a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the operators.buffer
for example with a drop strategy, or attaching a subscriber that drops all messages. If there are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new subscriber arrives it will adaptively slow down, ensuring no more messages are dropped.
產生的源可以被物化任意次數,每次物化有效地附加一個新的訂戶。如果沒有訂閱伺服器連線到這個集線器,那麼它不會丟棄任何元素,而是向上遊生產商施加反壓力,直到訂閱伺服器到達為止。可以通過使用operators.buffer(例如使用drop策略)或附加一個刪除所有訊息的訂閱伺服器來調整此行為。如果沒有其他訂閱者,這將確保生產者保持枯竭(刪除所有元素),一旦新訂戶到達,它將自適應地減速,確保不再丟棄更多的訊息。
Combining dynamic operators to build a simple Publish-Subscribe service
結合動態運算子構建簡單的釋出-訂閱服務
The features provided by the Hub implementations are limited by default. This is by design, as various combinations can be used to express additional features like unsubscribing producers or consumers externally. We show here an example that builds aFlow
representing a publish-subscribe channel. The input of theFlow
is published to all subscribers while the output streams all the elements published.
預設情況下,集線器實現提供的功能受到限制。生產商可以通過外部的多種功能組合來表示退訂。我們在這裡展示了一個示例,它構建了一個表示釋出訂閱通道的流。流的輸入被髮布到所有訂閱伺服器,而輸出流傳輸所有已釋出的元素。
First, we connect aMergeHub
and aBroadcastHub
together to form a publish-subscribe channel. Once we materialize this small stream, we get back a pair ofSource
andSink
that together define the publish and subscribe sides of our channel.
- 首先,我們將MergeHub和BroadcastHub連線在一起,形成一個釋出-訂閱頻道。一旦我們實現了這個小流,我們就得到了一對源和匯,它們共同定義了我們通道的釋出和訂閱端。
-
// Obtain a Sink and Source which will publish and receive from the "bus" respectively. val (sink, source) = MergeHub.source[String](perProducerBufferSize = 16).toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
We now use a few tricks to add more features. First of all, we attach aSink.ignore
at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the desired one this line can be dropped.
- 我們現在使用一些技巧來新增更多功能。首先,我們附上Sink.忽略在頻道的廣播側,以在沒有訂戶的情況下保持它的流量。如果此行為不是期望的行為,則可以刪除該行。
-
// Ensure that the Broadcast output is dropped if there are no listening parties. // If this dropping Sink is not attached, then the broadcast hub will not drop any // elements itself when there are no subscribers, backpressuring the producer instead. source.runWith(Sink.ignore)
We now wrap theSink
andSource
in aFlow
usingFlow.fromSinkAndSource
. This bundles up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side (even if the subscriber side is dropping). It also allows us to attach aKillSwitch
as aBidiStage
which in turn makes it possible to close both the originalSink
andSource
at the same time. Finally, we addbackpressureTimeout
on the consumer side to ensure that subscribers that block the channel for more than 3 seconds are forcefully removed (and their stream failed).
- 我們現在使用來自sinkandsource的流. 這會將通道的兩側捆綁成一個,並強制通道的使用者始終定義釋出者和訂閱者端(即使訂戶端正在退出)。它還允許我們附加一個KillSwitch作為BidiStage,從而使我們能夠同時關閉原始的Sink和Source。最後,我們在使用者端新增backpressureTimeout,以確保阻塞通道超過3秒的訂戶被強制刪除(並且他們的流失敗)。
-
// We create now a Flow that represents a publish-subscribe channel using the above // started stream as its "topic". We add two more features, external cancellation of // the registration and automatic cleanup for very slow subscribers. val busFlow: Flow[String, String, UniqueKillSwitch] = Flow .fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi[String, String])(Keep.right) .backpressureTimeout(3.seconds)
The resulting Flow now has a type ofFlow[String, String, UniqueKillSwitch]
representing a publish-subscribe channel which can be used any number of times to attach new producers or consumers. In addition, it materializes to aUniqueKillSwitch
(seeUniqueKillSwitch) that can be used to deregister a single user externally:
- 生成的流現在有一個流型別[String,String,UniqueKillSwitch],它表示一個釋出訂閱通道,可以多次使用該通道來附加新的生產者或消費者。此外,它具體化為UniqueKillSwitch(請參見UniqueKillSwitch),可用於在外部登出單個使用者:
-
val switch: UniqueKillSwitch = Source.repeat("Hello world!").viaMat(busFlow)(Keep.right).to(Sink.foreach(println)).run() // Shut down externally switch.shutdown()
Using the PartitionHub
使用PartitionHub
This is amay changefeature*
這是一個可能改變的功能*
APartitionHub
can be used to route elements from a common producer to a dynamic set of consumers. The selection of consumer is done with a function. Each element can be routed to only one consumer.
PartitionHub可用於將元素從一個公共生產者路由到一組動態的使用者。消費者的選擇是通過函式完成的。每個元素只能路由到一個使用者。
The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is aSink
to which the single producer must be attached first. Consumers can only be attached once theSink
has been materialized (i.e. the producer has been started). One example of using thePartitionHub
:
生產者的稅率將自動適應最慢的消費者。在這種情況下,集線器是一個接收器,必須首先連線單個生產者。只有當接收器被具體化(即生產者已經啟動)時,才能附加消費者。使用PartitionHub的一個示例:
-
// A simple producer that publishes a new "message-" every second val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b") // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat( PartitionHub.sink( (size, elem) => math.abs(elem.hashCode % size), startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
Thepartitioner
function takes two parameters; the first is the number of active consumers and the second is the stream element. The function should return the index of the selected consumer for the given element, i.e.int
greater than or equal to 0 and less than number of consumers.
partitioner函式接受兩個引數;第一個引數是活動使用者的數量,第二個引數是stream元素。函式應返回給定元素的選定使用者的索引,即int大於或等於0且小於消費者數量。
The resultingSource
can be materialized any number of times, each materialization effectively attaching a new consumer. If there are no consumers attached to this hub then it will not drop any elements but instead backpressure the upstream producer until consumers arrive. This behavior can be tweaked by using an operator, for example.buffer
with a drop strategy, or attaching a consumer that drops all messages. If there are no other consumers, this will ensure that the producer is kept drained (dropping all elements) and once a new consumer arrives and messages are routed to the new consumer it will adaptively slow down, ensuring no more messages are dropped.
產生的源可以被物化任意次數,每次物化都有效地附加了一個新的消費者。如果沒有消費者連線到這個中心,那麼它不會丟棄任何元素,而是向上遊生產商施加反壓力,直到消費者到達。這種行為可以通過使用運算子進行調整,例如,使用刪除策略的緩衝區,或附加一個刪除所有訊息的使用者。如果沒有其他消費者,這將確保生產者保持枯竭(丟棄所有元素),一旦新的消費者到達,訊息被路由到新的消費者,它將自適應地減慢速度,確保不再丟棄更多的訊息。
It is possible to define how many initial consumers that are required before it starts emitting any messages to the attached consumers. While not enough consumers have been attached messages are buffered and when the buffer is full the upstream producer is backpressured. No messages are dropped.
在它開始向附加的使用者傳送任何訊息之前,可以定義需要多少初始使用者。雖然沒有足夠的使用者被附加到訊息緩衝區,當緩衝區已滿時,上游生產者將背壓。不會丟棄任何訊息。
The above example illustrate a stateless partition function. For more advanced stateful routing thestatefulSink
can be used. Here is an example of a stateful round-robin function:
上面的示例演示了一個無狀態分割槽函式。對於更高階的有狀態路由,可以使用statefulSink。下面是一個有狀態迴圈函式的示例:
-
// A simple producer that publishes a new "message-" every second val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b") // New instance of the partitioner function and its state is created // for each materialization of the PartitionHub. def roundRobin(): (PartitionHub.ConsumerInfo, String) => Long = { var i = -1L (info, elem) => { i += 1 info.consumerIdByIdx((i % info.size).toInt) } } // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(PartitionHub.statefulSink(() => roundRobin(), startAfterNrOfConsumers = 2, bufferSize = 256))( Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
Note that it is a factory of a function to to be able to hold stateful variables that are unique for each materialization.
請注意,它是一個函式的工廠,能夠儲存每個具體化都是唯一的狀態變數。
The function takes two parameters; the first is information about active consumers, including an array of consumer identifiers and the second is the stream element. The function should return the selected consumer identifier for the given element. The function will never be called when there are no active consumers, i.e. there is always at least one element in the array of identifiers.
該函式接受兩個引數;第一個引數是有關活動使用者的資訊,包括一個使用者識別符號陣列,第二個引數是流元素。函式應返回給定元素的選定使用者識別符號。如果沒有活動的消費者,即識別符號陣列中始終至少有一個元素,則永遠不會呼叫該函式。
Another interesting type of routing is to prefer routing to the fastest consumers. TheConsumerInfo
has an accessorqueueSize
that is approximate number of buffered elements for a consumer. Larger value than other consumers could be an indication of that the consumer is slow. Note that this is a moving target since the elements are consumed concurrently. Here is an example of a hub that routes to the consumer with least buffered elements:
另一種有趣的路由選擇是選擇最快的使用者。ConsumerInfo有一個訪問器queueSize,它是一個使用者的緩衝元素的近似數量。比其他消費者更大的價值可能表明消費者行動遲緩。請注意,這是一個移動的目標,因為元素是併發使用的。以下是一個集線器的示例,該集線器使用最少的緩衝元素路由到使用者:
-
val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently. val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.toMat( PartitionHub.statefulSink( () => (info, elem) => info.consumerIds.minBy(id => info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.throttle(10, 100.millis).runForeach(msg => println("consumer2: " + msg))