Search code examples
kotlinx.coroutines

Long living service with coroutines


I want to create a long living service that can handle events. It receives events via postEvent, stores it in repository (with underlying database) and send batch of them api when there are enough events.

Also I'd like to shut it down on demand. Furthermore I would like to test this service.

This is what I came up so far. Currently I'm struggling with unit testing it. Either database is shut down prematurely after events are sent to service via fixture.postEvent() or test itself gets in some sort of deadlock (was experimenting with various context + job configurations).

What am I doing wrong here?

class EventSenderService(
        private val repository: EventRepository,
        private val api: Api,
        private val serializer: GsonSerializer,
        private val requestBodyBuilder: EventRequestBodyBuilder,
) : EventSender, CoroutineScope {

    private val eventBatchSize = 25

    val job = Job()
    private val channel = Channel<Unit>()

    init {
        job.start()

        launch {
            for (event in channel) {
                val trackingEventCount = repository.getTrackingEventCount()

                if (trackingEventCount < eventBatchSize) continue

                readSendDelete()
            }
        }
    }

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job

    override fun postEvent(event: Event) {
        launch(Dispatchers.IO) {
            writeEventToDatabase(event)
        }
    }

    override fun close() {
        channel.close()
        job.cancel()
    }

    private fun readSendDelete() {
        try {
            val events = repository.getTrackingEvents(eventBatchSize)

            val request = requestBodyBuilder.buildFor(events).blockingGet()

            api.postEvents(request).blockingGet()

            repository.deleteTrackingEvents(events)
        } catch (throwable: Throwable) {
            Log.e(throwable)
        }
    }

    private suspend fun writeEventToDatabase(event: Event) {
        try {
            val trackingEvent = TrackingEvent(eventData = serializer.toJson(event))
            repository.insert(trackingEvent)
            channel.send(Unit)
        } catch (throwable: Throwable) {
            throwable.printStackTrace()
            Log.e(throwable)
        }
    }
}

Test

@RunWith(RobolectricTestRunner::class)
class EventSenderServiceTest : CoroutineScope {

    @Rule
    @JvmField
    val instantExecutorRule = InstantTaskExecutorRule()

    private val api: Api = mock {
        on { postEvents(any()) } doReturn Single.just(BaseResponse())
    }
    private val serializer: GsonSerializer = mock {
        on { toJson<Any>(any()) } doReturn "event_data"
    }
    private val bodyBuilder: EventRequestBodyBuilder = mock {
        on { buildFor(any()) } doReturn Single.just(TypedJsonString.buildRequestBody("[ { event } ]"))
    }
    val event = Event(EventName.OPEN_APP)

    private val database by lazy {
        Room.inMemoryDatabaseBuilder(
                RuntimeEnvironment.systemContext,
                Database::class.java
        ).allowMainThreadQueries().build()
    }

    private val repository by lazy { database.getRepo() }

    val fixture by lazy {
        EventSenderService(
                repository = repository,
                api = api,
                serializer = serializer,
                requestBodyBuilder = bodyBuilder,
        )
    }

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + fixture.job

    @Test
    fun eventBundling_success() = runBlocking {

        (1..40).map { Event(EventName.OPEN_APP) }.forEach { fixture.postEvent(it) }

        fixture.job.children.forEach { it.join() }

        verify(api).postEvents(any())
        assertEquals(15, eventDao.getTrackingEventCount())
    }
}

After updating code as @Marko Topolnik suggested - adding fixture.job.children.forEach { it.join() } test never finishes.


Solution

  • One thing you're doing wrong is related to this:

    override fun postEvent(event: Event) {
        launch(Dispatchers.IO) {
            writeEventToDatabase(event)
        }
    }
    

    postEvent launches a fire-and-forget async job that will eventually write the event to the database. Your test creates 40 such jobs in rapid succession and, while they're queued, asserts the expected state. I can't work out, though, why you assert 15 events after posting 40.

    To fix this issue you should use the line you already have:

    fixture.job.join()
    

    but change it to

    fixture.job.children.forEach { it.join() }
    

    and place it lower, after the loop that creates the events.


    I failed to take into account the long-running consumer job you launch in the init block. This invalidates the advice I gave above to join all children of the master job.

    Instead you'll have to make a bit more changes. Make postEvent return the job it launches and collect all these jobs in the test and join them. This is more selective and avoids joining the long-living job.


    As a separate issue, your batching approach isn't ideal because it will always wait for a full batch before doing anything. Whenever there's a lull period with no events, the events will be sitting in the incomplete batch indefinitely.

    The best approach is natural batching, where you keep eagerly draining the input queue. When there's a big flood of incoming events, the batch will naturally grow, and when they are trickling in, they'll still be served right away. You can see the basic idea here.