Search code examples
f#easynetq

Compilation error when trying to use topics with EasyNetQ in F#


I am trying to subscribe to a RabbitMq topic using the PubSub.Subscribe method in EasyNetq with F#. The function subscribeToAppQueueWithoutTopic compiles and works but the subscribeToAppQueueWithTopic function will not compile at all.

let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
    bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived, 
    (fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue")), cts.Token)

Error   FS0041  No overloads match for method 'Subscribe'.
Known types of arguments: string * (AppEnvelope -> unit) * (ISubscriptionConfiguration -> ISubscriptionConfiguration) * CancellationToken

Available overloads:
 - (extension) IPubSub.Subscribe<'T>(subscriptionId: string, onMessage: Action<'T>, configure: Action<ISubscriptionConfiguration>, ?cancellationToken: CancellationToken) : ISubscriptionResult // Argument 'configure' doesn't match
 - (extension) IPubSub.Subscribe<'T>(subscriptionId: string, onMessage: Func<'T,CancellationToken,Tasks.Task>, configure: Action<ISubscriptionConfiguration>, ?cancellationToken: CancellationToken) : ISubscriptionResult // Argument 'onMessage' doesn't match

I found a c# example of subscribing with a topic here EasyNetQ subscription tests which looks like this

 bus.PubSub.SubscribeAsync<Message>(
            Guid.NewGuid().ToString(),
            firstTopicMessagesSink.Receive,
            x => x.WithTopic("first"),
            cts.Token

and thought I could use fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue") as equivalent in F#. Alas this will not compile.

Here is an example app showing the problem

open System
open EasyNetQ
open System.Threading

type Subscription<'T> = { OnMessageReceived: 'T -> unit }

[<Queue("appqueue", ExchangeName = "demopappexchange")>]
type AppEnvelope = { Message : obj }  

[<EntryPoint>]
let main argv =

    let bus = RabbitHutch.CreateBus("host=localhost")

    let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

    let printMessage message = 
        printfn "%s" message

    let subscription = {  
        OnMessageReceived = fun (envelope: AppEnvelope) -> (envelope.Message.ToString() |> printMessage )
     }
      
    let sendToAppWithTopic message = 
        async {              
                do! bus.PubSub.PublishAsync({AppEnvelope.Message = message}, "app.queue") |> Async.AwaitTask  
                // bus.Dispose()
            } |> Async.Start
  
    let subscribeToAppQueueWithoutTopic (callback : Subscription<AppEnvelope>) =
        printfn "subscribe called"
        bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived)

    (*  ** Will not compile **
    let subscribeAsyncToAppQueueWithTopic = 
        async {
            do! bus.PubSub.SubscribeAsync<AppEnvelope>(String.Empty, callback.OnMessageReceived, 
                                                       fun (x: ISubscriptionConfiguration) -> x.WithTopic "scanservice.queue")
                                                       |> Async.AwaitTask
    } |> Async.Start
    *)

    // Will not compile
    let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
        bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived, (fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue")), cts.Token)
   
    subscribeToAppQueueWithoutTopic subscription |> ignore
    sendToAppWithTopic "Testing"

    Console.ReadKey() |> ignore
    0  

Solution

  • I don't know anything about EasyNetQ, but I think the problem here is that WithTopic returns a reference to the mutated configuration, which you need to explicitly ignore in F# in order to produce an Action<_>, like this:

    let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
        bus.PubSub.Subscribe<AppEnvelope>(
            String.Empty,
            callback.OnMessageReceived,
            (fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue") |> ignore),
            cts.Token)
    

    Apparently, the API does this in order to provide a fluent C# interface:

    /// <summary>
    /// Allows publish configuration to be fluently extended without adding overloads
    ///
    /// e.g.
    /// x => x.WithTopic("*.brighton").WithPriority(2)
    /// </summary>
    public interface IPublishConfiguration
    {
        /// <summary>
        /// Sets a priority of the message
        /// </summary>
        /// <param name="priority">The priority to set</param>
        /// <returns>Returns a reference to itself</returns>
        IPublishConfiguration WithPriority(byte priority);
    
        /// <summary>
        /// Sets a topic for the message
        /// </summary>
        /// <param name="topic">The topic to set</param>
        /// <returns>Returns a reference to itself</returns>
        IPublishConfiguration WithTopic(string topic);
    

    From a functional programming perspective, this is a confusing way to do things, but such is life in the C# world, I suppose.