Search code examples
c#f#rabbitmqeasynetq

EasyNetQ Wait for event from ISubscriptionResult


I am using EasyNetQ for mamanging my RabbitMq messenging bus using f# well its quite simple this one publishes messenges:

let HandleBusResponse (data:BaseFrame) (bus:IBus) : ISubscriptionResult = 
    let handler = Action<RequestMessage>(fun request ->
        let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
            match request.FrameType with 
                  | FrameType.UNDEFINED -> 
                    let response = ResponseService.BusRequestFactory request data
                  | _ -> 
                    let response = ResponseService.BusRequestFactory request data 
    bus.Publish<ResponseMessage>(response, cabinetSubscribeId))

And this method(subscriber) is called somewhere in application like:

HandleBusResponse data bus
//WAIT FOR MESSAGE ARRIVAL
informationPools.AddToConnectionPool(data.ProjectId, client) |> ignore 
ClientInfoHandler client data |> Async.RunSynchronously|> ignore 

Basically the same way it works in c#.

Now this code is out of subscriber handler, my question is is it possible to detect from this part

//WAIT FOR MESSAGE ARRIVAL

using some sort of while if anything new arrived (remembering that second code snippet is outside subscribe handler)?


Solution

  • If you could post a complete minimal example that compiled and worked in F# Interactive, it would be easier to directly implement the solution for your problem. However, one solution that should work in general would be awaiting events. In F#, you can create events like this:

    let requestArrived = Event<RequestMessage>()
    let responsePublished = Event<ResponseMessage>()
    

    Then, you can asynchronously await those events and handle the data that they produce:

    let rec messageLoop<'message> f (event: IEvent<'message>) = 
        async {
            let! message = event |> Async.AwaitEvent 
            message |> f
            return! messageLoop f event
        }
    

    So, in your // WAIT FOR MESSAGE ARRIVAL block, you could do something like this (depending on whether you want to handle requests or responses, or what you want to do with them:

    requestArrived.Publish |> messageLoop (fun request -> printfn "Received Request: %A" request)
    // AND/OR
    responsePublished.Publish |> messageLoop (fun response -> printfn "Sent Response: %A" response)
    

    Then, you can update your handler to trigger the events as requried:

        let handler = Action<RequestMessage>(fun request ->        
            requestArrived.Trigger(request)
            let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
            let response = 
                match request.FrameType with 
                | FrameType.UNDEFINED -> 
                    ResponseService.BusRequestFactory request data
                | _ -> 
                    ResponseService.BusRequestFactory request data 
            bus.Publish<ResponseMessage>(response, cabinetSubscribeId)
            responsePublished.Trigger(response))