Search code examples
spring-bootrsocketspring-rsocket

rsocket routing metadata using RSocket-Java for Spring Rsocket Server


How do I setup routing metadata (in payload using just RSocket-Java when server is using Spring Boot Rsocket.

Flux<Payload> s = connection.flatMapMany(requester -> requester.requestStream(DefaultPayload.create("Some Message")))

Server is using @MessageMapping("/route")


Solution

  • Interaction type

    RSocket interaction type on SpringBoot using @MessageMapping is decided based on signature of annotated method (more info in spring docs)

    Let's assume it is having signature:

    @MessageMapping("/route")
    Flux<String> getStreamOfStrings(String message) {...}
    

    Based on cardinality table from spring docs interaction type is Request-Stream.

    RSocket client

    RSocket java client needs to have specified mime-type for metadata:

    RSocket rsocketClient = RSocketConnector.create()
        //metadata header needs to be specified
        .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
        // value of spring.rsocket.server.port eg 7000
        .connect(TcpClientTransport.create(7000))
        .block();
    

    Data

    Data will be simple string:

    ByteBuf data = ByteBufAllocator.DEFAULT.buffer().writeBytes("request msg".getBytes());
    

    Metadata

    Routing in RSocket is defined as metadata extension and needs to be sent together with data to specify routing. Here is example how it can be created (see other classes in package io.rsocket.metadata)

    CompositeByteBuf metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
    RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of("/route"));
    CompositeMetadataCodec.encodeAndAddMetadata(metadata,
            ByteBufAllocator.DEFAULT,
            WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
            routingMetadata.getContent());
    

    Request-stream request

    Data and metadata are created so you can execute requestSteam using:

    rsocketClient.requestStream(DefaultPayload.create(data, metadata))
        .map(Payload::getDataUtf8)
        .toIterable()
        .forEach(System.out::println);