I know I must be missing some basic here. I'm familiar with RX Request(n) semantics for backpressure, but I'm struggling to make it work with rsocket-go. What I tried was calling DoOnNext
and then blocking inside that call, but rsocket appears to just continue buffering and filling up memory. How do I prevent it from receiving further messages until I've processed them?
I was able to solve this by following the test case here.
Basically:
var su rx.Subscription
f.DoOnNext(func(input payload.Payload) error {
su.Request(1)
return nil
}).
Subscribe(context.Background(), rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
su = s
su.Request(1)
}))