Search code examples
kotlinapache-camel

camel message ends up at wrong SEDA endpoint


I'm having a bit of confusion with camels SEDA endpoints here. Obviously there's things I don't understand, but so far I couldn't figure out what it is.

Here's my setup (in the abstract):

from("direct:some-entry-point")
    .setBody {
        // Route sets a byte array
        myByteArray
    }
    .to("seda:file-upload")     // The message goes on to upload the byte array
    .choice()
        .`when` { doTheOtherThing }   // Under certain circumstances, something needs to be done additionally. The body can already be uploaded, however.
        .to("direct:do-the-other-thing")


from("direct:do-the-other-thing")
    .setBody {
        // This route adds a file handle, because this one is big and won't fit into memory
       myFileHandle
    }
    .to("seda:streaming-upload")  //Since this file is big, it is forwarded to another endpoint to be uploaded as a stream.

from("seda:streaming-upload?queue=#FiloBlockingQueue")
    .process {
        // Opens a stream to the filehandle in the body and uploads from that.
    }

from("seda:file-upload?queue=#FiloBlockingQueue")
    .process {
         // Uploads the byte array in the body
    }


All in all, nothing too complex I would think. But I'm having an issue. For some wild reason, when some-entry-point sends the message to the file-upload endpoint, instead it ends up at the streaming-upload endpoint, where it will crash because it was expecting a file handle and got a byte array.

I have done all I can with logging and breakpoints, and for all I can tell, this is indeed what happens. The message never arrives at file-upload, and it never arrives at do-the-other-thing, even though the conditions for that to happen are given. I've also verified that do-the-other-thing is the only route that forwards to streaming-upload. The only way I can explain the behaviour is that the message forwarded to file-upload ends up at the wrong endpoint. Can anybody tell me why?

EDIT: Some new insights: The behaviour of messages being sent to the wrong queue is not consistent. In fact, it appears to be entirely random. I send a messages to any SEDA-queue endpoint, it's a cointoss which one will actually receive the message. Judging by the behaviour, I suspect that I'm completely misunderstanding hte underlying architecture here. It appears that there's only one SEDA-queue instance in my application, and the endpoints are both polling the same one. The first one to poll is the one that gets the message. That is not the behaviour I'd expect intuitively from something named "endpoint", but it would explain a lot. Any people here that actually know camel that can explain this to me?


Solution

  • It turns out that both those endpoints were backed by the same queue. My understanding of how camel works was woefully inadequate. When sending something to a SEDA-endpoint, it gets put on the designated queue, and SEDA endpoints will poll their queue at certain intervals. If two endpoints share the same queue, you have no control over which one will be first and receive the message.

    So if you want two different endpoints that are actually independent endpoints, you need to give them their own queue by making a bean for it and then injecting that into camel, like so:

    @Configuration
    class QueueConfig(
        @Value("\${capture.queueSize}") private val queueSize: Int,
        private val eventPublisher: ApplicationEventPublisher
    ) {
        @Bean("FirstBlockingQueue")
        fun createFirstQueue(): BlockingQueue<Exchange> =
            TODO("instantiate queue here")
    
        @Bean("SecondBlockingQueue")
        fun createSecondQueue(): BlockingQueue<Exchange> =
            TODO("instantiate other queue here")
    
    }
    

    In the route builder:

    from("seda:first-enpoint?queue=#FirstBlockingQueue")
        // do things
    
    from("seda:second-endpoint?queue=#SecondBlockingQueue")
        // do things
    

    Now both endpoints are backed by their own queue, and will not steal each others messages.