Search code examples
akkaakka-persistence

Akka delete persistent message(s)


I'm writing an application to handle events raised by equipments (1million per hour). Some events will be aggregated (and have a long time span (e.g. 48 hours)) containing a begin event, status(x-times)-events and end-event. Others are single event which can processed right away. To have an at least once guarantee the events will be processed, I'm looking at akka-persistence. Other parts of the application already use akka and kafka.

The solution I was aiming for should contain a persistent map, where the events can be easily picked from by their eventId. The order is of less importance. On completion of processing an event, it can be removed from the map (and should be no longer persisted).

In the docs / examples found, I found queue examples which satisfy the per-event purge requirement, but struggle on the easy lookup (queue has to be looped to find the event). And to satisfy the easy lookup I thought of using a map, using the PersistentActor trait and some db underneath. However events are purged by sequencenumber (which would remove events which need more processing / are waiting for other events to occur). Another trait investigated is the AtLeastOnceDelivery, with the delivery confirmation which satisfies the requirements, but this one blocks on recovery till all events are processed.

Any thoughts on how to implement in Akka a persistent basket for events? (I'm using scala btw)


Solution

  • Would something like this fit your needs? It's probably not exactly your logic, but basically it receives a new event, persists the fact that it received the event and then saves the event into the map using the id. Then at some point (not sure how you trigger the event processing) it receives the command to handle an event with a specific id. It persists the fact that it was supposed to handle the event, then handles the event and removes it from the map. That way the map will be restored on restart and you have all events, that weren't processed yet, accessible via Id.

    class PersistentMapActor extends PersistentActor {
    
        private var eventMap: Map[ Int, Event ] = Map[ Int, Event ]( )
    
        override def receiveRecover: Receive = {
                case NewEventSaved( payload: Event ) =>
                        eventMap = eventMap + ( (payload.eventId, payload) )
                case EventHandled( eventId ) =>
                        eventMap = eventMap - eventId
        }
    
        override def receiveCommand: Receive = {
                case SaveNewEvent( payload ) =>
                        persist( NewEventSaved( payload ) ) { persistedNewEvent =>
                                //Add new event to map
                                eventMap = eventMap + ( (payload.eventId, payload) )
                        }
                case HandleEvent( eventId ) =>
                        val event = eventMap.get( eventId )
    
                        event.foreach { e =>
                                persist( EventHandled( eventId ) ) { persistedEvent =>
                                        //Do stuff with the event
                                        //Remove the event from the map
                                        eventMap = eventMap - eventId
                                }
                        }
        }
    
        override def persistenceId: String = "PersistentMapActor"
    }
    
    object PersistentMapActor {
    
        case class Event( eventId: Int, someField: String )
    
        case class SaveNewEvent( payload: Event )
    
        case class NewEventSaved( payload: Event )
    
        case class HandleEvent( eventId: Int )
    
        case class EventHandled( eventId: Int )
    
    }