I am working on a Mule API flow testing out the Salesforce event streams. I have my connector set up and subscribed to a streaming channel.
This is working just fine when I create / update / delete contact records, the events come through and I process them by adding them to another database.
I am slightly confused with the replayId
functionality. With the current setup, I can shut down the Mule app, create contacts in the org, and then when I bring the app back online, it resumes by adding data from where it left off. Perfect.
However, I am trying to simulate what would happen if the mule app crashed while processing the events.
I ran some APEX to create 100 random contact records. As soon as I see it log the first flow in my app, I kill the mule app. My assumption here was that it would know where it left off when I resume the app, as if it was offline prior to the contact creation like in the previous test.
What I have noticed is that it only processes the few contacts that made it through before I shut the app down.
It appears that the events may be coming in so quickly in the flow input, that it has already reached the last replayId
in the stream. However, since these records still haven't been added to my external database, I am losing those records. The stream did what it was supposed to do, but due to the batch of work the app is still processing, my 100 records are not being committed like the replayId
reflects.
How can I approach this so that I don't end up losing data in the event there is a large stream of data prior to an app crash? I remember with Kafka, you had to were able to commit
the id once it was inserted into the database so that it knew that the last one you officially processed. Is there such a concept in Mule where I can tell it where I have officially left off and committed to the DB?
Reliability at the protocol (CometD) level implies a number of properties. Chief among them is a transactional ACK(nowledgement) of the message having been received by the subscriber. CometD supports ACKs as an extension. Salesforce's implementation of CometD doesn't support ACKs. Even if it did, you'd still have issues...but the frequency/loss of risk might be lower.
In your case you have to engineer a solution that amounts to finding and replaying events that were not committed to your target database. You do this using custom code or wiring adapters in Mule. Replay ID values are not guaranteed to be contiguous for consecutive events but they will be ordered. Event A with replay ID of 100 will be followed by event B with replay ID of 200.
You will need to store a replay ID value in your DB. You can then use it on resubscription (after subscriber failure) to retrieve events from SF that are missing from your DB. This will only work if the failure window is small enough. Salesforce event retention window is currently at 24 hours for standard platform event license. Higher-level licenses allow for longer retention.
Depending on the volume of data, frequency of events and other process parameters, you could get all of this out of the box with Heroku Connect. It does imply a Postgres DB on Heroku + licensing cost of HC and operational costs but most of our customers in similar circumstances find it worthwhile.