I currently have a simple TextMessage Source that sends messages to my Websocket client flow like this:
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
}
// send this as a message over the WebSocket
val outgoing: Source[TextMessage.Strict, NotUsed] = Source
.combine(
Source.single(
TextMessage(
"""{"auth":"APIKEY-123"}"""
)
),
Source.single(
TextMessage(
"""{"topic":"topic123"}"""
)
),
Source.never
)(Merge(_))
.throttle(1, 1.second)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
So I currently have a Source of type Source[TextMessage.Strict, NotUsed], but I want to use the commented out code where I have a ActorRef as my source.
I tried this:
val actorSource: Source[Any, ActorRef] = Source.actorRef(
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef ! """{"auth":"APIKEY-123"}"""
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
So when I am using a ActorRef as my source, I am having a hard time trying to fit this into the graph. I am getting this compile time error:
type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(
Note: I want a Actor as my source, and also as my sink i.e. pass all messages that result from the flow to another actor as a sink.
Can someone explain what am I currently doing wrong with my Actor as a source and trying to add it to my flow/graph?
Update
Here is the code I have now:
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val actorSource = Source.actorRef[String](
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.both
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
//in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
I am getting the following compile errors:
[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found
Your compiler error is arising from your actorSource
not outputting Message
but String
(that error shouldn't be the one you'd get with your code, perhaps you tried changing it to a Source[String, ActorRef]
?): since the webSocketFlow
only processes Message
s, it can only be attached to a source of Message
.
So I suggest something along the lines of:
val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
case Done => CompletionStrategy.immediately
}
val actorSource = Source.actorRef[Message](
completionMatcher = immediateCompletion,
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(Keep.both) // keep both the actor and the upgradeResponse
.toMat(incoming)(Keep.both) // ...and also keep the closed
.run()
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")