Search code examples
scalaakkaakka-stream

Download specified file with akka streams?


For following code, I need the client to send the file name to server, then server could reply to client the content of the specified file. It could work if I do not specify the filename, just hard coding in server part, but how client tell server it need specified file?

Server.scala

package com.tst

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Future

object Server extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind("127.0.0.1", 9989)
  connections runForeach { connection =>
    println(s"New connection from: ${connection.remoteAddress}")
    var fileName = ""

    // NOTE: here, the fileName = item can not affect the latter runWith,
    // I want to find solution
    val go = Flow[ByteString].via(connection.flow).map(_.utf8String).map {
      item => println(item); fileName = item
    }.runWith(FileIO.fromPath(Paths.get(fileName)), Sink.ignore)
  }
}

Client.scala

package com.tst

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

object Client extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val connection = Tcp().outgoingConnection("127.0.0.1", 9989)
  val f = Source(List("D:/testinput.txt")).map(ByteString(_)).via(connection).
    runWith(FileIO.toPath(Paths.get("D:/testoutput.txt")))
  f.foreach {
    _ =>
      println("done")
      system.terminate()
  }
}

build.sbt

name := "streamtest"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.4.20"
)

Solution

  • On the server side you can use the handleWith method for Connection in conjunction with a Flow that takes in the file name and produces the lines of the file:

    import akka.stream.scaladsl.FileIO
    import java.nio.file.Paths
    
    val fileNameToContentsFlow : Flow[ByteString, ByteString, _] = 
      Flow[ByteString]
        .map(_.utf8String)
        .take(1L)
        .map(fileName => Paths.get(fileName))
        .flatMapConcat( filePath => FileIO.fromPath(filePath) )
    

    Note: I added .take(1L) so that the client can only access 1 file per Connection. This can be modified to handle multiple files per connection but then an appropriate separator would need to be inserted in between each individual file's contents.

    Your server code would then be slightly modified

    connections runForeach { connection =>
    
      connection.handleWith(fileNameToContentsFlow)
    }