Streams do not run on the caller thread, instead they run on a different background thread. This is done to avoid blocking the caller. Therefore, once the stream completes, you need to terminate the underlying actor system to completely end the program. In order to do that you can use the Future
returned by runWith
to terminate that actor system.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val source = Source(0 to 10)
.map(n => n * 2)
.runWith(Sink.foreach(println)) // returns a Future[Done]
.onComplete(_ => system.terminate()) // onComplete callback of the future
Akka Stream API also has a watchForTermination
method that can be used to monitor stream termination both for success and failure cases. This is usually a good place to add logging messages or trigger some follow-up actions.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val source = Source(0 to 10)
.map(n => n * 2)
.watchTermination() {(_, done) =>
done.onComplete {
case Success(_) => println("Stream completed successfully")
system.terminate()
case Failure(error) => println(s"Stream failed with error ${error.getMessage}")
system.terminate()
}
}
.runWith(Sink.foreach(println))
See more in the Java or Scala documentation.