Search code examples
rsocket

How to apply backpressure with rsocket-go?


I know I must be missing some basic here. I'm familiar with RX Request(n) semantics for backpressure, but I'm struggling to make it work with rsocket-go. What I tried was calling DoOnNext and then blocking inside that call, but rsocket appears to just continue buffering and filling up memory. How do I prevent it from receiving further messages until I've processed them?


Solution

  • I was able to solve this by following the test case here.

    Basically:

    var su rx.Subscription
    
    f.DoOnNext(func(input payload.Payload) error {
            su.Request(1)
            return nil
    }).
    Subscribe(context.Background(), rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
            su = s
            su.Request(1)
    }))