I have the following setup
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
@RestController
@RequestMapping("/api/{orderToken}")
class TestingController {
companion object {
private val store = HashMap<String, SseEmitter>()
}
@GetMapping("/add", produces = ["text/event-stream"])
fun getOrderStatus(
@PathVariable orderToken: String,
): SseEmitter {
val sseEmitter = SseEmitter()
store[orderToken] = sseEmitter
return sseEmitter
}
@GetMapping("/notification", produces = ["text/event-stream"])
fun simulateKafkaEvent(
@PathVariable orderToken: String,
) {
val sseEmitter = store[orderToken]
sseEmitter!!.send("order token: $orderToken")
}
}
The idea here being as follows
/api/{orderToken}/add
endpoint to set up a SSE Connection./api/{orderToken}/notification
endpoint to send a message to the client using the SseEmitter.This works! Running the app and invoking the /add
endpoint from two different terminals with two different order tokens and then invoking the /notification
endpoint from a third terminal sends the expected message to the expected terminal.
But I want to have an automated test for it. Ultimately I need a mechanism to subscribe/listen to the returned SseEmitter but I haven't found a way to do it.
This is what the test looks like. It passes.. but for the wrong reasons. What it's doing is consuming the first SseEmitter rather than the follow up message.
@BeforeEach
fun setup() {
webTestClient = WebTestClient.bindToController(TestingController()).build()
}
@Test
fun `test 2`() {
val addEndpoint = webTestClient
.get()
.uri("/api/1234/add")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(SseEmitter::class.java)
.responseBody //It results FluxExchangeResult<ServerSentEvent<*>>
val notification = webTestClient.get()
.uri("/api/1234/kafka")
.accept()
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.returnResult(String::class.java)
.responseBody
StepVerifier.create(addEndpoint).consumeNextWith { println("notification: " + s)}.thenCancel().verify()
}
For future visitors this is the solution that I came up with after some help from https://stackoverflow.com/a/70018505/8016773.
fun test() {
...
val type = object : ParameterizedTypeReference<ServerSentEvent<String>>() {}
webClient.get()
.uri("/api/1234/add")
.retrieve()
.bodyToFlux(type)
.onErrorResume {
println("exception with subscription: $it")
Mono.empty()
}.subscribe {
println("success: $it")
result = it.data()!!
countdownLatch.countDown()
}
...
// invoke some function to send a message here.
countdownLatch.await()
// do the asserts for the result here
}
The countdownLatch
ensures that the test waits for the latch to be invoked before completing the test.