I have a stream, that should be controlled from http api(start,stop, only one instance). Response should be streamed to client. Here code with play framework controller:
class Processor{
def job(): Source[Int, NotUsed] ={
stop()
Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
}
def stop(): Unit ={
//TODO
}
}
class MyController(process: Processor) {
def startJob = Action {
val source = process.job()
Ok.chunked(source)
}
def cancell = Action {
process.cancel()
Ok("canceled")
}
}
I need ability to cancel job. When client closes connection, job should not cancel - it's just like log output. I read about KillSwitches
, but dont understund how use it with play controller, that accepts Source
. Any helps?
I think I need some output source, different from Job source.
I implement my task with Monix
Observable
. By action, I can run, cancel and connect to running stream. Anyway, I interested in akka-stream solution in educational purposes. Here monix solution:
class StreamService(implicit ec: Scheduler) {
private val runningStream: AtomicAny[Option[RunningStream]] = AtomicAny(None)
def run(): Option[Source[ByteString, NotUsed]] =
runningStream.get match {
case None =>
val observable = Observable
.interval(1.seconds)
.map(_.toString)
.doOnTerminate(cb => runningStream.set(None))
.doOnSubscriptionCancel(() => runningStream.set(None))
.publish
val cancelable_ = observable.connect()
this.runningStream.set(Some(RunningStream(cancelable_, observable)))
connect()
case _ => None
}
def connect(): Option[Source[ByteString, NotUsed]] =
runningStream.get
.map(rs => rs.observable.toReactivePublisher)
.map(publisher => Source.fromPublisher(publisher).map(ByteString(_)))
def cancel(): Unit =
runningStream.get.foreach(_.cancelable.cancel())
}
object StreamService {
case class RunningStream(cancelable: Cancelable, observable: ConnectableObservable[String])
}
class SomeController @Inject()(streamService: StreamService, cc: ControllerComponents)
extends AbstractController(cc) {
def run() = Action {
val source = streamService.run().getOrElse(throw new RuntimeException("Stream already running"))
Ok.chunked(source)
}
def connect() = Action {
val source = streamService.connect().getOrElse(throw new RuntimeException("Stream not running"))
Ok.chunked(source)
}
def cancel() = Action {
streamService.cancel()
Ok("ok")
}
}