To construct efficient, scalable and low-latency data streams, it is often important to perform tasks concurrently. However Akka Streams executes stages sequentially on a single thread, so you must explicitly request this concurrency. Inserting these asynchronous boundaries is done by way of adding Attributes.asyncBoundary
into your flows and graphs using the async
method on your Source
, Sink
or Flow
.
Choosing which stages can be performed in parallel requires a good understanding of the impact against the different operations in the pipeline. For more details, see this Akka blog article on the subject.
For example, compare the output between the normalGraph
(non-parallel) and concurrentGraph
(parallel) in the below program.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
def myStage(name: String): Flow[Int,Int,NotUsed] = Flow[Int].map{index =>
println(s"Stage $name is processing $index using thread '${Thread.currentThread().getName}'")
index
}
// Run one Runnable graph at a time to see the difference.
// Observe the threads in both.
val normalGraph = Source(1 to 100000)
.via(myStage("A"))
.via(myStage("B"))
.via(myStage("C"))
.via(myStage("D"))
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
val concurrentGraph = Source(1 to 100000)
.via(myStage("A")).async
.via(myStage("B")).async
.via(myStage("C")).async
.via(myStage("D")).async
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
See more in the Java or Scala documentation.