For following code, I need the client to send the file name to server, then server could reply to client the content of the specified file. It could work if I do not specify the filename, just hard coding in server part, but how client tell server it need specified file?
Server.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Future
object Server extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind("127.0.0.1", 9989)
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
var fileName = ""
// NOTE: here, the fileName = item can not affect the latter runWith,
// I want to find solution
val go = Flow[ByteString].via(connection.flow).map(_.utf8String).map {
item => println(item); fileName = item
}.runWith(FileIO.fromPath(Paths.get(fileName)), Sink.ignore)
}
}
Client.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
object Client extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = Tcp().outgoingConnection("127.0.0.1", 9989)
val f = Source(List("D:/testinput.txt")).map(ByteString(_)).via(connection).
runWith(FileIO.toPath(Paths.get("D:/testoutput.txt")))
f.foreach {
_ =>
println("done")
system.terminate()
}
}
build.sbt
name := "streamtest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.20"
)
On the server side you can use the handleWith
method for Connection
in conjunction with a Flow
that takes in the file name and produces the lines of the file:
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
val fileNameToContentsFlow : Flow[ByteString, ByteString, _] =
Flow[ByteString]
.map(_.utf8String)
.take(1L)
.map(fileName => Paths.get(fileName))
.flatMapConcat( filePath => FileIO.fromPath(filePath) )
Note: I added .take(1L)
so that the client can only access 1 file per Connection. This can be modified to handle multiple files per connection but then an appropriate separator would need to be inserted in between each individual file's contents.
Your server code would then be slightly modified
connections runForeach { connection =>
connection.handleWith(fileNameToContentsFlow)
}