I have Java TCP client Socket reading InputStream and distributing data packets to various parts of the application via RxJava PublishSubject. This works.
Also sometimes I write to OutputStream. Commands are converted into single data packet(byte[]) and pushed onto the stream. For this I use
public void writeToSocket(byte[] packet) {
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
}).subscribeOn(Schedulers.io()).subscribe();
}
Now I want to execute
outputStream.write(packet);
outputStream.flush();
in such a way that meets below condition
- Though source packet is getting created from multiple places (with different commands) simultaneously, execute above for each packet with a delay of 50 milliseconds. Ideally queue-up the packets and execute with delay.
Example:
Place1: createCommand1(),
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()
Is there any way to achieve this using RxJava. Thanks in advance!
You could use a serialized PublishSubject
to collect up bytes, then use concatMapCompletable
to execute the write and then have a delay:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.concatMapCompletable(bytes ->
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
})
.subscribeOn(Schedulers.io())
.andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
)
.subscribe();
Alternatively, if you don't mind dedicating a single thread to the emission at all times, you could just execute the write and sleep in doOnNext
:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.observeOn(Schedulers.io())
.doOnNext(packet -> {
outputStream.write(packet);
outputStream.flush();
Thread.sleep(50);
})
.subscribe();