Search code examples
javagokotlinrpcrsocket

Blocked Flux while waiting for last element


I want to connect two applications via rsocket. One is written in GO and second in Kotlin. I want to realize connection where client sends Stream of data and server send confirmation response.

The problem is with waiting for all elements, if server do not BlockOnLast(ctx), whole stream is read, but response is send before all entries arrive. If BlockOnLast(ctx) is added, Server (GoLang) is stuck.

I wrote also client in Kotlin, and in that case whole communication is working perfectly fine.

Do enyone may help?

GO Server:

package main

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"rsocket-go-rpc-test/proto"
)

func main() {
addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rsocket.Receive().
    Fragment(1024).
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
        return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(payloads rx.Publisher) flux.Flux {
                println("START")

                payloads.(flux.Flux).
                    DoOnNext(func(input payload.Payload) {
                        chunk := &pl_dwojciechowski_proto.Chunk{}
                        proto.Unmarshal(input.Data(), chunk)
                        println(string(chunk.Content))
                    }).BlockLast(ctx)

                return flux.Create(func(i context.Context, sink flux.Sink) {
                    status, _ := proto.Marshal(&pl_dwojciechowski_proto.UploadStatus{
                        Message: "OK",
                        Code:    0,
                    })

                    sink.Next(payload.New(status, make([]byte, 1)))
                    sink.Complete()
                    println("SENT")
                })
            }),
        ), nil
    }).
    Transport(addr).
    Serve(ctx)
panic(err)

}

Kotlin Client:

private fun clientCall() {
val rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8081)).start().block()
val client = FileServiceClient(rSocket)

val requests: Flux<Chunk> = Flux.range(1, 10)
    .map { i: Int -> "sending -> $i" }
    .map<Chunk> {
        Chunk.newBuilder()
            .setContent(ByteString.copyFrom(it.toByteArray())).build()
    }

val response = client.send(requests).block() ?: throw Exception("")
rSocket.dispose()
System.out.println(response.message)

}

And equivalent for GO written in Kotlin:

    val serviceServer = FileServiceServer(DefaultService(), Optional.empty(), Optional.empty())
val closeableChannel = RSocketFactory.receive()
    .acceptor { setup: ConnectionSetupPayload?, sendingSocket: RSocket? ->
        Mono.just(
            RequestHandlingRSocket(serviceServer)
        )
    }
    .transport(TcpServerTransport.create(8081))
    .start()
    .block()
    closeableChannel.onClose().block()

class DefaultService : FileService {
override fun send(messages: Publisher<Service.Chunk>?, metadata: ByteBuf?): Mono<Service.UploadStatus> {
    return Flux.from(messages)
        .windowTimeout(10, Duration.ofSeconds(500))
        .flatMap(Function.identity())
        .doOnNext { println(it.content.toStringUtf8()) }
        .then(Mono.just(Service.UploadStatus.newBuilder().setCode(Service.UploadStatusCode.Ok).setMessage("test").build()))
}
}

Server Output:

START
sending -> 1

Solution

  • Solution below:

    package main
    import (
       "context"
       "github.com/golang/protobuf/proto"
       "github.com/rsocket/rsocket-go"
       "github.com/rsocket/rsocket-go/payload"
       "github.com/rsocket/rsocket-go/rx"
       "github.com/rsocket/rsocket-go/rx/flux"
       "rsocket-go-rpc-test/proto"
    )
    type TestService struct {
       totals int
    pl_dwojciechowski_proto.FileService
    }
    var statusOK = &pl_dwojciechowski_proto.UploadStatus{
       Message: "code",
    Code:    pl_dwojciechowski_proto.UploadStatusCode_Ok,
    }
    var statusErr = &pl_dwojciechowski_proto.UploadStatus{
       Message: "code",
    Code:    pl_dwojciechowski_proto.UploadStatusCode_Failed,
    }
    func main() {
       addr := "tcp://127.0.0.1:8081"
    ctx, cancel := context.WithCancel(context.Background())
       defer cancel()
       err := rsocket.Receive().
          Fragment(1024).
          Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
             return rsocket.NewAbstractSocket(
                rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
                   dataReceivedChan := make(chan bool, 1)
                   toChan, _ := flux.Clone(msgs).
                      DoOnError(func(e error) {
                         dataReceivedChan <- false
    }).
                      DoOnComplete(func() {
                         dataReceivedChan <- true
    }).
                      ToChan(ctx, 1)
                   fluxResponse := flux.Create(func(ctx context.Context, s flux.Sink) {
                      gluedContent := make([]byte, 1024)
                      for c := range toChan {
                         chunk := pl_dwojciechowski_proto.Chunk{}
                         _ = chunk.XXX_Unmarshal(c.Data())
                         gluedContent = append(gluedContent, chunk.Content...)
                      }
                      if <-dataReceivedChan {
                         marshal, _ := proto.Marshal(statusOK)
                         s.Next(payload.New(marshal, nil))
                         s.Complete()
                      } else {
                         marshal, _ := proto.Marshal(statusErr)
                         s.Next(payload.New(marshal, nil))
                         s.Complete()
                      }
                   })
                   return fluxResponse
    }),
    ), nil
    }).
          Transport(addr).
          Serve(ctx)
       panic(err)
    }