Search code examples
javascriptnode.jseventstoredb

How Event Store persistent subscription are working?


I'm currently working on a project where we use Event Store as a write store for our events. Appending events works well, the problem comes when we want to listen for those event.

To listen to event store events, we use the official Event Store Node.js client npm i @eventstore/db-client.

We did create a persistent subscription in the Admin UI. To connect to this subscription we use the eventStoreClient.connectToPersistentSubscription and it connects correctly.

The problem is that our events are replayed for event. In fact they get stuck in the parked messages list.

Is the replaying of events a normal behaviour ? How the event store can remember the events it gave to the Node.js client ?

Note: The events are currently being played and our projection are constructed the right way but the event are replayed for ever.

The code we use to listen to events

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
            bufferSize: 10,
        }, {

        })

        for await (const e of stream) {
            const { event, commitPosition, link } = e
            // console.log(event, commitPosition, link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized, we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                await stream.ack(event.id)
                // this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
            } catch (error) {
                await stream.nack(RETRY, error.message, event.id)
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
            }
        }

Our event store logs

{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":609,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}

The persistent subscription configuration

enter image description here

EDIT: We changed our script to acknoledge the linkto event instead of the linked event as $ce-precontrol is a stream of linkto.

Our events are not retried for ever now.

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
            bufferSize: 10,
        }, {

        })

for await (const e of stream) {
            const { event, commitPosition, link } = e
            // console.log(event, commitPosition, link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized, we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                await stream.ack(link?.id || '')
                this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
            } catch (error) {
                await stream.nack(RETRY, error.message, link?.id || '')
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
            }
        }


Solution

  • My events were retry for ever. I was listening on a stream made of link tos. I configured the persistent subscription to resolve the link tos so when I was accessing event.id I was getting the linked event id.

    I was using this linked event id to ack the event instead of calling ack with the link to event id.

    Because my events were not ack, they did time out and got retry until they reached the max retry count.

    
    for await (const e of stream) {
                const { event, commitPosition, link } = e
                // console.log(event, commitPosition, link)
    
                if (!event) {
                    this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                    return
                }
    
                // We should map the event instead of casting it.
                // Every field of an event which are instance of classes won't be correctly
                // unserialized, we need to re-instatiate those classes.
                const eventBody = event.data as Event
                console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)
    
                this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
    
                try {
                    await this.eventHandler.dispatch(eventBody)
                    this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                    // Using the link to event id to ack
                    await stream.ack(link?.id || '')
                    this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
                } catch (error) {
                    await stream.nack(RETRY, error.message, link?.id || '')
                    this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
                }
            }