I am a relative newbie to ReactiveX and have learned about Rx.Observable.take
and Rx.Observable.takeLast
to take from the beginning and end of a sequence respectively and Rx.Observable.windowWithCount
to take potentially overlapping windows from an original observable. For fun, I'd like to write the FFT algorithm entirely using reactive operators and transducers. Some of the algorithm is intuitive to port, but some is difficult to model with streams. Specifically, an rfft acts upon the beginning and end values of a sequence, which I can't figure out how to do. More specifically, if I have:
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
it would be broken up into observable windows of:
[[0,1,14,15],[2,3,12,13],[4,5,10,11],[6,7,8,9]]
Is there an elegant way to do this for any arbitrary observable sequence?
I have to say, that I don't think it is a good idea to use reactive with finite streams, because you do not have any events to react onto or any backpressure. You do have to know about the stream length and that it finite. The best solution would be using arrays with o(1). Nevertheless here is a possible solution, which would use many cpu cycles. I use RxJava2-RC5 for testing.
@Test
public void ttfAlgo() throws Exception {
Observable<Integer> ascendingOrdered = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.concatWith(Observable.just(11, 12, 13, 14, 15));
Observable<Integer> descendingOrdered = ascendingOrdered.sorted((t1, t2) -> {
return t2.compareTo(t1);
});
Observable.zip(ascendingOrdered.window(2), descendingOrdered.window(2), (w1, w2) -> {
return w1.concatWith(w2.sorted());
}).flatMap(window -> {
System.out.println("windowX");
return window;
}).subscribe(integer -> {
System.out.println(integer);
});
Thread.sleep(1_000);
}