Search code examples
rxjsrx-javareactive-programming

How to pipe rx operators to combine fragment data?


I want to use Rx to deal with serial port data, which the packet structure look like this.

+-----------+--------+---------+
| Signature | Length | Payload |
+-----------+--------+---------+
| 2 byte    | 1 byte | ...     |
+-----------+--------+---------+

But there would be many fragment of the received data. Such like (Signature are 0xFC 0xFA)
Data 1: 0xFC 0xFA 0x02 0x01 0x01 0xFC 0xFA 0x03 0x01 // Contain one packet and a fragment packet
Data 2: 0x02 0x03 0xFC 0xFA 0x02 0x01 0x03 // Contain the continued fragment of previous and a new packet

How to pipe the operators to output as
Packet1: 0xFC 0xFA 0x02 0x01 0x01
Packet2: 0xFC 0xFA 0x03 0x01 0x02 0x03
...


Solution

  • You are splitting a stream of bytes by a defined pattern. I'm not sure how you receive your bytes and the way you'll model your observable, Observable<byte> or Observable<byte[]> !?

    Anyway, here what I've guessed translated in strings, but the idea still the same. I've chosen x followed by y as a pattern (0xFC 0xFA in your case).

    You'll find my comments in the code :

    final ImmutableList<String> PATTERN = ImmutableList.of("x", "y");
    
    Observable<String> source = Observable
            .fromArray("x", "y", "1", "2", "3", "x", "y", "4", "5", "x", "y", "1", "x", "y", "x", "4", "6", "x")
            .share();//publishing to hot observable (we are splitting this source by some of its elements)
    
    //find the next pattern
    Observable<List<String>> nextBoundary = source
            .buffer(2, 1)
            .filter(pairs -> CollectionUtils.isEqualCollection(PATTERN, pairs));
    
    //https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png
    //start a buffer for each x found
    //buffers (packets) may overlap
    source.buffer(source.filter(e -> e.equals("x")),
            x -> source
                    .take(1)//next emission after the x
                    .switchMap(y -> y.equals("y") ?
                            nextBoundary // if 'y' then find the next patter
                            : Observable.empty() //otherwise stop buffering
                    )
    )
            .filter(packet -> packet.size() > 2)//do not take the wrong buffers like ["x", "4"] (x not followed by y) but it is not lost
            .map(packet -> {
                //each packet is like the following :
                //[x, y, 1, 2, 3, x, y]
                //[x, y, 4, 5, x, y]
                //[x, y, 1, x, y]
                //[x, y, x, 4, 6, x]
                //because of the closing boundary, the event comes too late
                //then we have to handle the packet (it overlaps on the next one)
                List<String> ending = packet.subList(packet.size() - 2, packet.size());
                return CollectionUtils.isEqualCollection(PATTERN, ending) ? packet.subList(0, packet.size() - 2) : packet;
            })
            .blockingSubscribe(e -> System.out.println(e));
    

    Result:

    [x, y, 1, 2, 3]
    [x, y, 4, 5]
    [x, y, 1]
    [x, y, x, 4, 6, x]