Search code examples
wcff#faultchannelfactory

WCF F# - Handling Service Shutdown Gracefully on Client


I have some experimental code basically just trying to make a simple scenario work. I have one client that is streaming data to multiple services. The problem I have is that if one of the services does not shutdown gracefully I get an EndpointNotFoundException which I can't seem to handle. Below is my attempt at handling this which is failing. In reality I would like to remove the failed service channel from the list of channels and continue on streaming data to the services which are still up and running. The timer stuff simply gives the services a chance to start up before the data streaming starts.

let prices = returns a seq of data that is streamed.

type ReplayDataStream(prices) =
  let evt = new Event<_>()
  member x.Replay() = 
                    async { for line, delay in prices do
                                do! Async.Sleep(delay)
                                evt.Trigger(line) }
                                |> Async.StartImmediate

  member x.PriceChanged = evt.Publish


let main() =
    let addresses = new ResizeArray<EndpointAddress>()

    let announcementService = new AnnouncementService()

    let createChannels addresses =
        let channels = new ResizeArray<IInputDataService>()
        for (address:EndpointAddress) in addresses do
                let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                let channel = channelFactory.CreateChannel()
                (channel :?> ICommunicationObject).Faulted.Add(fun x -> 
                                                                        (channel :?> ICommunicationObject).Abort()
                                                                        channels.Remove(channel) |> ignore
                                                               )
                channels.Add(channel)
        channels

    let sendMessage(args:ElapsedEventArgs) =
        let channels = createChannels addresses
        for financialDataStream in prices do
        let replayDataStreamA = new ReplayDataStream(financialDataStream)
        for channel in channels do
            try
            //This is where it blows up and the try block isn't catching the exception.
            replayDataStreamA.PriceChanged.Add(channel.InputStringData)
            with
            | :? EndpointNotFoundException as ex -> Console.WriteLine(ex.ToString())
            | :? CommunicationException as ex -> Console.WriteLine(ex.ToString())
            | :? Exception as ex -> Console.WriteLine(ex.ToString())
            replayDataStreamA.Replay()

    let timer = new System.Timers.Timer()
    timer.Enabled <- true
    timer.AutoReset <- false
    timer.Interval <- 30000.0
    timer.Start()
    timer.Elapsed.Add(sendMessage)

    announcementService.OnlineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Add(e.EndpointDiscoveryMetadata.Address)
                                                                )

    announcementService.OfflineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Remove(e.EndpointDiscoveryMetadata.Address) |> ignore
                                                                )

    let announcementServiceHost = new ServiceHost(announcementService)
    try
        announcementServiceHost.AddServiceEndpoint(new UdpAnnouncementEndpoint());
        announcementServiceHost.Open();
    with 
    | :? System.ServiceModel.CommunicationException as ex -> Console.WriteLine(ex.ToString())
    | :? System.TimeoutException as ex -> Console.WriteLine(ex.ToString())


    printfn "%s" "Hit any key to close."
    Console.ReadKey() |> ignore

Solution

  • After rewriting my code in C# it finally dawned on me what I was doing wrong. This is what the PriceChanged event handler should look like. I needed to catch the exception inside the lambda itself. Now I need to write something that actually looks like production code. :)

    replayDataStreamA.PriceChanged.Add( fun x -> 
                                                                try
                                                                channel.InputStringData x
                                                                with 
                                                                | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                                )
    

    For posterity here is the entire method:

    let sendMessage(args:ElapsedEventArgs) =
                if(addresses.Count > 0) then
                    for address in addresses do
                        let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                        let channel = channelFactory.CreateChannel()
                        for financialDataStream in prices do
                        let replayDataStreamA = new ReplayDataStream(financialDataStream)
                        replayDataStreamA.PriceChanged.Add( fun x -> 
                                                            try
                                                            channel.InputStringData x
                                                            with 
                                                            | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                            )
                        replayDataStreamA.Replay()