Search code examples
algorithmsequencesequences

How to condense a stream of incrementing sequence numbers down to one?


I am listening to a server which sends certain messages to me with sequence numbers. My client parses out the sequence number in order to keep track of whether we get a duplicate or whether we miss a sequence number, though it is called generically by a wrapper object which expects a single incremental sequence number. Unfortunately this particular server sends different streams of sequence numbers, incremental only within each substream. In other words, a simpler server would send me:

1,2,3,4,5,7

and I would just report back 1,2,3,4,5,6,7 and the wrapper tool would notify of having lost one message. Unfortunately this more complex server sends me something like:

A1,A2,A3,B1,B2,A4,C1,A5,A7

(except the letters are actually numerical codes too, conveniently). The above has no gaps except for A6, but since I need to report one number to the wrapper object, i cannot report:

1,2,3,1,2,4,1,5,7

because that will be interpreted incorrectly. As such, I want to condense, in my client, what I receive into a single incremental stream of numbers. The example

A1,A2,A3,B1,B2,A4,C1,A5,A7

should really translate to something like this:

1,2,3,4 (because B1 is really the 4th unique message), 5, 6, 7, 8, 10 (since 9 could have been A6, B3, C2 or another letter-1)

then this would be picked up as having missed one message (A6). Another example sequence:

A1,A2,B1,A7,C1,A8

could be reported as:

1,2,3,8,9,10

because the first three are logically in a valid sequence without anything missing. Then we get A7 and that means we missed 4 messages (A3,A4,A5, and A6) so I report back 8 so the wrapper can tell. Then C1 comes in and that is fine so I give it #9, and then A8 is now the next expected A so I give it 10.

I am having difficulty figuring out a way to create this behavior though. What are some ways to go about it?


Solution

  • For each stream, make sure that that stream has the correct sequence. Then, emit the count of all valid sequence numbers you've seen as the aggregate one. Pseudocode:

    function initialize()
        for stream in streams do
            stream = 0
        aggregateSeqno = 0
    
    function process(streamId, seqno)
       if seqno = streams[streamId] then
          streams[streamId] = seqno + 1
          aggregateSeqno = aggregateSeqno + 1
          return aggregateSeqno
       else then
          try to fix streams[streamId] by replying to the server
    
    function main()
       initialize()
       while(server not finished) do
          (streamId, seqno) = receive()
          process(streamId, seqno)