Search code examples
kotlinlistenerchannelkotlin-coroutinesbehaviorsubject

Unit testing Kotlin's ConflatedBroadcastChannel behavior


In the new project that I'm currently working on I have no RxJava dependency at all, because until now I didn't need that - coroutines solve threading problem pretty gracefully.

At this point I stumbled upon on a requirement to have a BehaviorSubject-alike behavior, where one can subscribe to a stream of data and receive the latest value upon subscription. As I've learned, Channels provide very similar behavior in Kotlin, so I decided to give them a try.

From this article I've learned, that ConflatedBroadcastChannel is the type of channel that mimics BehaviorSubject, so I declared following:

class ChannelSender {

    val channel = ConflatedBroadcastChannel<String>()

    fun sendToChannel(someString: String) {
         GlobalScope.launch(Dispatchers.Main) { channel.send(someString) }
    }
}

For listening to the channel I do this:


    class ChannelListener(val channelSender: ChannelSender) {
        fun listenToChannel() {
            channelSender.channel.consumeEach { someString ->
                if (someString == "A") foo.perform() 
                else bar.perform()
            }
        }
    }

This works as expected, but at this point I'm having difficulties understanding how to unit test ChannelListener.

I've tried to find something related here, but none of example-channel-**.kt classes were helpful.

Any help, suggestion or correction related to my incorrect assumptions is appreciated. Thanks.


Solution

  • With the help of Alexey I could manage to end up having following code, which answers the question:

    class ChannelListenerTest {
    
      private val val channelSender: ChannelSender = mock()
    
      private val sut = ChannelListener(channelSender)
      private val broadcastChannel = ConflatedBroadcastChannel<String>()
    
      private val timeLimit = 1_000L
      private val endMarker = "end"
    
      @Test
      fun `some description here`() = runBlocking {
        whenever(channelSender.channel).thenReturn(broadcastChannel)
    
        val sender = launch(Dispatchers.Default) {
          broadcastChannel.offer("A")
          yield()
        }
    
        val receiver = launch(Dispatchers.Default) {
          while (isActive) {
            val i = waitForEvent()
            if (i == endMarker) break
            yield()
          }
        }
    
        try {
          withTimeout(timeLimit) {
            sut.listenToChannel()
            sender.join()
            broadcastChannel.offer(endMarker) // last event to signal receivers termination
            receiver.join()
          }
          verify(foo).perform()
        } catch (e: CancellationException) {
          println("Test timed out $e")
        }
      }
    
      private suspend fun waitForEvent(): String =
          with(broadcastChannel.openSubscription()) {
            val value = receive()
            cancel()
            value
          }
    
    }