Search code examples
spring-webfluxrsocket

How to handle message sent from server to client with RSocket?


I try to use RSocketRequester to send a message from the server to the specific client, but I don't know how to handle it on the frontend. The server is Spring Webflux with the controller like this:

data class Message(val message: String)

@Controller
class RSocketController {

    private val log = LoggerFactory.getLogger(RSocketController::class.java)

    @MessageMapping("say.hello")
    fun sayHello(message: String): Flux<Message> {
        log.info("say hello {}", message)
        return Flux.just(Message("server says hello"))
    }

    @MessageMapping("say.hi")
    fun sayHi(message: String, rSocketRequester: RSocketRequester): Flux<Message> {
        log.info("say hi {}", message)
        rSocketRequester
                .route("say.hello")
                .data(Message("server says hi hello ;)"))
                .send()
                .subscribe()
        return Flux.just(Message("server says hi!!"))
    }

}

On the frontend I use rsocket-js. The sayHello method works just fine (request-stream), but when I call the sayHi method I want to send two messages from the server. The first one to say.hello endpoint, and the second to say.hi endpoint. I've got rsocket-js implementation like this:

    sayHello() {
      console.log("say hello");
      this.requestStream("say.hello");
    },
    sayHi() {
      console.log("say hi");
      this.requestStream("say.hi");
    },
    connect() {
      const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8080/rsocket"
      });
      const client = new RSocketClient({        
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {                  
          keepAlive: 60000,          
          lifetime: 180000,
          dataMimeType: "application/json",
          metadataMimeType: "message/x.rsocket.routing.v0"
        },
        transport
      });
      client.connect().subscribe({
        onComplete: socket => {
          this.socket = socket;
          console.log("complete connection");          
        },
        onError: error => {
          console.log("got connection error");
          console.error(error);
        },
        onSubscribe: cancel => {
          console.log("subscribe connection");
          console.log(cancel);          
        }        
      });
    },    
    requestStream(url) {
      if (this.socket) {
        this.socket
          .requestStream({
            data: url + " from client",
            metadata: String.fromCharCode(url.length) + url
          })
          .subscribe({
            onComplete: () => console.log("requestStream done"),
            onError: error => {
              console.log("got error with requestStream");
              console.error(error);
            },
            onNext: value => {
              // console.log("got next value in requestStream..");
              console.log("got data from sever");
              console.log(value.data);
            },
            // Nothing happens until `request(n)` is called
            onSubscribe: sub => {
              console.log("subscribe request Stream!");
              sub.request(2147483647);
              // sub.request(3);
            }
          });
      } else {
        console.log("not connected...");
      }
    }

I can see both messages in Google Chrome DevTools -> Network -> rsocket. So the client receives them but I can't catch in the code the one sent by RSocketRequester. It seems that the server uses fireAndForget method. How to handle it on the client side?


Solution

  • As @VladMamaev said, we can provide a responder to the client like in this example https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/LeaseClientExample.js#L104

    For me, fireAndForget method is enough.

    export class EchoResponder {
      constructor(callback) {
        this.callback = callback;
      }
      fireAndForget(payload) {  
        this.callback(payload);
      }  
    }
    
    import { EchoResponder } from "~/assets/EchoResponder";
    ...
    
      const messageReceiver = payload => {
        //do what you want to do with received message  
        console.log(payload)    
      };
      const responder = new EchoResponder(messageReceiver);
    
    
      connect() {
      const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8080/rsocket"
      });
      const client = new RSocketClient({        
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {                  
          keepAlive: 60000,          
          lifetime: 180000,
          dataMimeType: "application/json",
          metadataMimeType: "message/x.rsocket.routing.v0"
        },
        responder: responder,
        transport
      });