Search code examples
springkotlinautomated-testsspring-webfluxserver-sent-events

How do I test an SseEmitter send event within kotlin


I have the following setup

import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter

@RestController
@RequestMapping("/api/{orderToken}")
class TestingController {

    companion object {
        private val store = HashMap<String, SseEmitter>()
    }

    @GetMapping("/add", produces = ["text/event-stream"])
    fun getOrderStatus(
        @PathVariable orderToken: String,
    ): SseEmitter {
        val sseEmitter = SseEmitter()
        store[orderToken] = sseEmitter
        return sseEmitter
    }

    @GetMapping("/notification", produces = ["text/event-stream"])
    fun simulateKafkaEvent(
        @PathVariable orderToken: String,
    ) {
        val sseEmitter = store[orderToken]
        sseEmitter!!.send("order token: $orderToken")
    }
}

The idea here being as follows

  • Client invokes the /api/{orderToken}/add endpoint to set up a SSE Connection.
  • This connection gets stored in a map.
  • Something else invokes the /api/{orderToken}/notification endpoint to send a message to the client using the SseEmitter.

This works! Running the app and invoking the /add endpoint from two different terminals with two different order tokens and then invoking the /notification endpoint from a third terminal sends the expected message to the expected terminal.

But I want to have an automated test for it. Ultimately I need a mechanism to subscribe/listen to the returned SseEmitter but I haven't found a way to do it.

This is what the test looks like. It passes.. but for the wrong reasons. What it's doing is consuming the first SseEmitter rather than the follow up message.

    @BeforeEach
    fun setup() {
        webTestClient = WebTestClient.bindToController(TestingController()).build()
    }

    @Test
    fun `test 2`() {
        val addEndpoint = webTestClient
            .get()
            .uri("/api/1234/add")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk
            .expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
            .returnResult(SseEmitter::class.java)
            .responseBody  //It results FluxExchangeResult<ServerSentEvent<*>>

        val notification = webTestClient.get()
            .uri("/api/1234/kafka")
            .accept()
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .returnResult(String::class.java)
            .responseBody

        StepVerifier.create(addEndpoint).consumeNextWith { println("notification: " + s)}.thenCancel().verify()
    }

Solution

  • For future visitors this is the solution that I came up with after some help from https://stackoverflow.com/a/70018505/8016773.

    fun test() {
    ...
      val type = object : ParameterizedTypeReference<ServerSentEvent<String>>() {}
    
      webClient.get()
        .uri("/api/1234/add")
        .retrieve()
        .bodyToFlux(type)
        .onErrorResume {
            println("exception with subscription: $it")
            Mono.empty()
        }.subscribe {
            println("success: $it")
            result = it.data()!!
            countdownLatch.countDown()
        }
      ...
      // invoke some function to send a message here. 
    
      countdownLatch.await()
    
      // do the asserts for the result here
    }
    

    The countdownLatch ensures that the test waits for the latch to be invoked before completing the test.