I'm building an app that has the following flow:
ffmpeg
in the end but for this simple reproducible use case it is just cat
to have data be passed through it)So I'm doing the following operations:
ProcessBuilder
in conjunction with Flow.fromSinkAndSource
to build flow out of this external process executionComplete code example:
import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
object MyApp extends App {
// When this is changed to something above 15, the graph just stops
val PROCESSES_COUNT = Integer.parseInt(args(0))
println(s"Running with ${PROCESSES_COUNT} processes...")
implicit val system = ActorSystem("MyApp")
implicit val globalContext: ExecutionContext = ExecutionContext.global
def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
val convertProcess = new ProcessBuilder(cmd).start
val pipeIn = new BufferedOutputStream(convertProcess.getOutputStream)
val pipeOut = new BufferedInputStream(convertProcess.getInputStream)
Flow
.fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
}
val source = Source(1 to 100)
.map(element => {
println(s"--emit: ${element}")
ByteString(element)
})
val sinksList = (1 to PROCESSES_COUNT).map(i => {
Flow[ByteString]
.via(executeCmdOnStream("cat"))
.toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
})
val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>
val broadcast = builder.add(Broadcast[ByteString](sinks.size))
source ~> broadcast.in
for (i <- broadcast.outlets.indices) {
broadcast.out(i) ~> sinks(i)
}
ClosedShape
}
Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)
}
Run this using following command:
sbt "run PROCESSES_COUNT"
i.e.
sbt "run 15"
This all works quite well until I raise the amount of "external processes" (PROCESSES_COUNT in the code). When it's 15 or less, all goes well but when it's 16 or more then the following things happen:
cat
processes are started in the system (all 16 of them)cat
processes in the system, something frees up and processing continues (of course in the result, one file is empty because I killed its processing command)I checked that this is caused by the external execution for sure (not i.e. limit of Akka Broadcast itself).
I recorded a video showing these two situations (first, 15 items working fine and then 16 items hanging and freed up by killing one process) - link to the video
Both the code and video are in this repo
I'd appreciate any help or suggestions where to look solution for this one.
It is an interesting problem and it looks like that the stream is dead-locking. The increase of threads may be fixing the symptom but not the underlying problem.
The problem is following code
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn),
StreamConverters.fromInputStream(() => pipeOut)
)
Both fromInputStream
and fromOutputStream
will be using the same default-blocking-io-dispatcher
as you correctly noticed. The reason for using a dedicated thread pool is that both perform Java API calls that are blocking the running thread.
Here is a part of a thread stack trace of fromInputStream
that shows where blocking is happening.
at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon$1.onPull(InputStreamSource.scala:63)
Now, you're running 16
simultaneous Sink
s that are connected to a single Source
. To support back-pressure, a Source
will only produce an element when all Sink
s send a pull
command.
What happens next is that you have 16 calls to method FileInputStream.readBytes
at the same time and they immediately block all threads of default-blocking-io-dispatcher
. And there are no threads left for fromOutputStream
to write any data from the Source
or perform any kind of work. Thus, you have a dead-lock.
The problem can be fixed if you increase the threads in the pool. But this just removes the symptom.
The correct solution is to run fromOutputStream
and fromInputStream
in two separate thread pools. Here is how you can do it.
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn).async("blocking-1"),
StreamConverters.fromInputStream(() => pipeOut).async("blocking-2")
)
with following config
blocking-1 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
blocking-2 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
Because they don't share the pools anymore, both fromOutputStream
and fromInputStream
can perform their tasks independently.
Also note that I just assigned 2
threads per pool to show that it's not about the thread count but about the pool separation.
I hope this helps to understand akka streams better.