Search code examples
javarx-javaobservablerx-scala

How to implement a single use ordered ReplaySubject?


How can I have a single-subscriber ReplaySubject that:

  1. buffers all events received with onNext() until someone subscribes to it,
  2. once someone subscribes to it, all buffered events are forwarded to the subscriber, erasing them from the ReplaySubject buffer (for performance reasons) and subsequent events are only forwarded to the subscriber (no more buffering),
  3. if more than one observer subscribes to it, an exception is thrown,
  4. all buffered events given to the subject are ordered based on the time when each event was generated, and they are forwarded to the subscriber in that order during subscription?

Also, does this make sense? I think there would be good use cases for this subject, for cases when it forwards events from, e.g., a file system...


Solution

  • There is the UnicastSubject for this purpose, although it's not part of the official API; that means it can change or get removed without warning.

    Edit

    Let me point you to the features you requested:

    1) buffers all events:

    If there is no child or it hasn't caught up: store the element in the queue:

    2) When subscribed to, the buffer is consumed and replayed to the Subscriber

    The drain loop will poll from the queue and consume it and if the Subscriber caught up, will emit to it directly.

    3) if more than one observer subscribes to it, an exception is thrown

    A subsequent subscriber will get an exception

    4) all buffered events given to the subject are ordered based on the time when each event was generated

    Uses a single-producer single-consumer queue that guarantees FIFO order