How to do Throttling in Akka Streams
When building a streaming application you may find the need to throttle the upstream so as to avoid exceeding a specified rate. Akka Stream's provides the capability to either fail the stream or shape it by applying back pressure. This is simply done by adding a throttle
element and specifying the number of elements per time unit.implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val throttleGraph = Source(1 to 1000000)
.map(n => s"I am number $n")
.throttle(elements = 1, per = 1 second, maximumBurst = 1, mode = ThrottleMode.shaping)
.runWith(Sink.foreach(println))
.onComplete(_ => system.terminate())
Once the upper bound has been reached the parameter maximumBurst
can be used to allow the client to send a burst of messages while still respecting the throttle
. This is further exhibited in the below example.implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
def writeToDB(batch: Seq[Int]): Future[Unit] = Future {
println(s"Writing ${batch.size} elements to the DB using thread '${Thread.currentThread().getName}'")
}
val throttlerGraph2 = Source(1 to 1000000)
.grouped(10)
.throttle(elements = 10, per = 1 second, maximumBurst = 10, mode = ThrottleMode.shaping)
.mapAsync(10)(writeToDB)
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
See more in the Java or the Scala documentation.
Related Articles
How to do Rate Limiting in Akka Streams
In certain scenarios it is important to limit the number of concurrent requests to other services. For example, to avoid overwhelming the services and avoid performance degradation, or to maintain service level agreements. This is particularly ...
Error handling and recovery in Akka Streams
When developing applications you should assume that there will be unexpected issues. For this, Akka provides a set of supervision strategies to deal with errors within your actors. Akka streams is no different, in fact its error handling strategies ...
How to implement batching logic in Akka Streams
A common request we see with streaming data is the need to take the stream of elements and group them together (i.e. committing data to a database, a message queue or disk). Batching is usually a more efficient and performant solution than writing a ...
Terminating a stream
Streams do not run on the caller thread, instead they run on a different background thread. This is done to avoid blocking the caller. Therefore, once the stream completes, you need to terminate the underlying actor system to completely end the ...
How to stream multiple stream actions concurrently
To construct efficient, scalable and low-latency data streams, it is often important to perform tasks concurrently. However Akka Streams executes stages sequentially on a single thread, so you must explicitly request this concurrency. Inserting these ...