Search code examples
rx-javarx-java2rx-java3

RxJava receive data packets from multiple source and write to OutputStream with delay between each packet


I have Java TCP client Socket reading InputStream and distributing data packets to various parts of the application via RxJava PublishSubject. This works.

Also sometimes I write to OutputStream. Commands are converted into single data packet(byte[]) and pushed onto the stream. For this I use

public void writeToSocket(byte[] packet) {
    Completable.fromAction(() -> {
         outputStream.write(packet);
         outputStream.flush();
    }).subscribeOn(Schedulers.io()).subscribe(); 
}

Now I want to execute

    outputStream.write(packet);
    outputStream.flush();

in such a way that meets below condition

  1. Though source packet is getting created from multiple places (with different commands) simultaneously, execute above for each packet with a delay of 50 milliseconds. Ideally queue-up the packets and execute with delay.
Example:
Place1: createCommand1(), 
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()

Is there any way to achieve this using RxJava. Thanks in advance!


Solution

  • You could use a serialized PublishSubject to collect up bytes, then use concatMapCompletable to execute the write and then have a delay:

    var subject = PublishSubject.<byte[]>create().toSerialized();
    
    subject
      .concatMapCompletable(bytes -> 
           Completable.fromAction(() -> {
               outputStream.write(packet);
               outputStream.flush();
           })
           .subscribeOn(Schedulers.io())
           .andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
       )
       .subscribe();
    

    Alternatively, if you don't mind dedicating a single thread to the emission at all times, you could just execute the write and sleep in doOnNext:

    var subject = PublishSubject.<byte[]>create().toSerialized();
    
    subject
      .observeOn(Schedulers.io())
      .doOnNext(packet -> {
         outputStream.write(packet);
         outputStream.flush();
         Thread.sleep(50);
      })
      .subscribe();