Search code examples
springmongodbkotlinchangestream

How use SchedulerLock for MongoChangeStream? Are there analogues?


I have am spring mongodb application, where I am using MongoChangeStream for some business logic. But if there are several instances, the business logic is duplicated as many instances. So I decided to try using @SchedulerLock from net.javacrumbs.shedlock. But it did not help, the collection shedLockwas not created either. The application works the same as if I did not add @SchedulerLock.

@Component
class BookChangeStreamListener(
    mongoTemplate: MongoTemplate
) : MongoChangeStreamListener<BookDocument>(mongoTemplate) {

    override fun createMessageListener() =
        createMessageListener(
            object : CallbackChangeStream<BookDocument> {
                @Scheduled(fixedDelayString = "1s")
                @SchedulerLock(
                    name = "BookChangeStreamListenerInsert",
                    lockAtMostFor = "5m",
                    lockAtLeastFor = "5m"
                )
                override fun insert(raw: ChangeStreamDocument<Document>, body: BookDocument?) {
                    body?.let {
                        // business logic
                    }
                }
            }
}

config:

import com.mongodb.client.MongoClient
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.mongo.MongoLockProvider
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock
import org.springframework.boot.autoconfigure.mongo.MongoProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableScheduling

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "5m")
class LockConfig {
    @Bean
    fun lockProvider(mongo: MongoClient, properties: MongoProperties): LockProvider {
        return MongoLockProvider(mongo.getDatabase(properties.database))
    }
}

build.gradle.kts:

implementation("net.javacrumbs.shedlock:shedlock-provider-mongo:4.34.0")
implementation("net.javacrumbs.shedlock:shedlock-spring:4.34.0")

Solution

  • @SchedulerLock cannot be used for anonymous class methods. Final solution that works for me:

    @Component
    class BoookChangeStreamListener(
        mongoTemplate: MongoTemplate,
        @Qualifier("BookCallbackChangeStream")
        val callbackChangeStream: CallbackChangeStream<BookDocument>
    ) : MongoChangeStreamListener<BookDocument>(mongoTemplate) {
    
        override fun createMessageListener() = createMessageListener(callbackChangeStream)
    }
    
    @Component("bookCallbackChangeStream")
    class BookCallbackChangeStreamImpl(
        val eventPublisher: ApplicationEventPublisher,
        val mapper: BookToDomainMapper
    ) : CallbackChangeStream<BookDocument> {
    
        @SchedulerLock(
            name = "BookChangeStreamListenerInsert",
            lockAtMostFor = "10ms",
            lockAtLeastFor = "10ms"
        )
        override fun insert(raw: ChangeStreamDocument<Document>, body: BookDocument?) {
            // business logic
        }
    }