Search code examples
javadistributed-computingcqrsfailoverxa

Parallelism and Failover of a Sequential Data


Good time guys!

We have a pretty straightforward application-adapter: once in 30 seconds it reads records from a database (can't write to it) of one system, converts each of these records into an internal format, performs filtering, encrichment, ..., and, finally, transforms the resulting, let's say, entities into an xml format and sends them via a JMS to other system. Nothing new.

Let's add some spice here: records in the database are sequential (that means that their identifies are generated by a sequence), and when it is time to read a new bunch of records, we get a last-processed-sequence-number -- which is stored in our internal databese and updated each time the next record is processed (sent to the JMS) -- and start reading from that record (+1).

The problem is our customers gave us an NFR: processing of a read record bunch must not last longer than 30 seconds. As far as there are a lot of steps in the workflow (with some pretty long running ones), and it is possible to get a pretty big amount of records, and as far as we process them one by one, it can take more than 30 seconds.

Because of all the above I want to ask 2 questions:

1) Is there an approach of a parallel processing of sequential data, maybe with one or several intermediate storages, or Disruptor patern, or cqrs-like, or a notification-based, or ... that provides a possibility of working in such a system?

2) A general one. I need to store a last-processed-number and send an entity to the JMS. If I save a number to a database and then some problem raises with the JMS, on an application's restart my adapter will think that it successfuly sended the entity, which is not true and it won't be ever received. If I send an entity and after that try so save a number to a database and get an exception, on an application's restart a reprocessing will be performed which will lead to duplications in the JMS. I'm not sure that xa transactions will help here or some kind of a last resorce gambit...

Could somebody, please, share experience or ideas?

Thanks in advance!


Solution

  • Folks, finally we ended up with the following solution. We implemented a kind of the Actor Model. The idea is the following.

    There are two main (internal) database tables for our application, let's call them READ_DATA_INFO, which contains a last-read-record-number of the 'source' external system, and DUMPED_DATA, which stores a metadata about each read record of the source system. This is how it all works: each n (a configurable property) seconds a service bus reads the last processed identifier of the source system and sends a request to the source system to get new records from it. If there are several new records, they are being wrapped with a DumpRecordBunchMessage message and sent to a DumpActor class. This class begins a transaction which comprises two operations: update the last-read-record-number (the READ_DATA_INFO table) and save a metadata about each reacord (the DUMPED_DATA table) (each dumped record gets the 'NEW' status. When a record is successfully processed, it gets the 'COMPLETED' status; otherwise - the 'FAILED' status). In case of a successfull transaction commit each of those records is wrapped with a RecordMessage message class and send to next processing actor; otherwise those records are just skipped - they would be reread after next n seconds.

    There are three interesting points:

    • an application's disaster recovery. What if our application will be stopped somehow at the middle of a processing. No problem, at an application's startup (@PostConstruct marked method) we find all the records with the 'NEW' statuses at the DUMPED_DATA table and with a help of a stored metadata rebuild restore them from the source system.

    • parallel processing. After all records are successfully dumped, they become independent, which means that they can be processed in parallel. We introduced several mechanisms of a parallelism and a loa balancing. The simplest one is a round robin approach. Each processing actor consists of a parant actor (load balancer) and a configurable set of it's child actors (worker). When a new message arrives to the parent actor's queue, it dispatches it to the next worker.

    • duplicate record prevention. This is the most interesting one. Let's assume that we read data each 5 seconds. If there is an actor with a long running operation, it is possible to have several tryings to read from the source system's database starting from the same last-read-record number. Thus there would potentially be a lot duplicate records dumped and processed. In order to prevent this we added a CAS-like check of DumpActor's messages: if the last-read-record from a message is equal to a one from the DUMPED_DATA table, this message should be processed (no messages were processed before it); otherwise this message is rejected. Rather simple, but powerfull.

    I hope this overview will help somebody. Have a good time!