I am trying to subscribe to a RabbitMq topic using the PubSub.Subscribe method in EasyNetq with F#. The function subscribeToAppQueueWithoutTopic compiles and works but the subscribeToAppQueueWithTopic function will not compile at all.
let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived,
(fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue")), cts.Token)
Error FS0041 No overloads match for method 'Subscribe'.
Known types of arguments: string * (AppEnvelope -> unit) * (ISubscriptionConfiguration -> ISubscriptionConfiguration) * CancellationToken
Available overloads:
- (extension) IPubSub.Subscribe<'T>(subscriptionId: string, onMessage: Action<'T>, configure: Action<ISubscriptionConfiguration>, ?cancellationToken: CancellationToken) : ISubscriptionResult // Argument 'configure' doesn't match
- (extension) IPubSub.Subscribe<'T>(subscriptionId: string, onMessage: Func<'T,CancellationToken,Tasks.Task>, configure: Action<ISubscriptionConfiguration>, ?cancellationToken: CancellationToken) : ISubscriptionResult // Argument 'onMessage' doesn't match
I found a c# example of subscribing with a topic here EasyNetQ subscription tests which looks like this
bus.PubSub.SubscribeAsync<Message>(
Guid.NewGuid().ToString(),
firstTopicMessagesSink.Receive,
x => x.WithTopic("first"),
cts.Token
and thought I could use fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue")
as equivalent in F#. Alas this will not compile.
Here is an example app showing the problem
open System
open EasyNetQ
open System.Threading
type Subscription<'T> = { OnMessageReceived: 'T -> unit }
[<Queue("appqueue", ExchangeName = "demopappexchange")>]
type AppEnvelope = { Message : obj }
[<EntryPoint>]
let main argv =
let bus = RabbitHutch.CreateBus("host=localhost")
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
let printMessage message =
printfn "%s" message
let subscription = {
OnMessageReceived = fun (envelope: AppEnvelope) -> (envelope.Message.ToString() |> printMessage )
}
let sendToAppWithTopic message =
async {
do! bus.PubSub.PublishAsync({AppEnvelope.Message = message}, "app.queue") |> Async.AwaitTask
// bus.Dispose()
} |> Async.Start
let subscribeToAppQueueWithoutTopic (callback : Subscription<AppEnvelope>) =
printfn "subscribe called"
bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived)
(* ** Will not compile **
let subscribeAsyncToAppQueueWithTopic =
async {
do! bus.PubSub.SubscribeAsync<AppEnvelope>(String.Empty, callback.OnMessageReceived,
fun (x: ISubscriptionConfiguration) -> x.WithTopic "scanservice.queue")
|> Async.AwaitTask
} |> Async.Start
*)
// Will not compile
let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
bus.PubSub.Subscribe<AppEnvelope>(String.Empty, callback.OnMessageReceived, (fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue")), cts.Token)
subscribeToAppQueueWithoutTopic subscription |> ignore
sendToAppWithTopic "Testing"
Console.ReadKey() |> ignore
0
I don't know anything about EasyNetQ, but I think the problem here is that WithTopic
returns a reference to the mutated configuration, which you need to explicitly ignore in F# in order to produce an Action<_>
, like this:
let subscribeToAppQueueWithTopic (callback : Subscription<AppEnvelope>) =
bus.PubSub.Subscribe<AppEnvelope>(
String.Empty,
callback.OnMessageReceived,
(fun (x:ISubscriptionConfiguration) -> x.WithTopic("app.queue") |> ignore),
cts.Token)
Apparently, the API does this in order to provide a fluent C# interface:
/// <summary>
/// Allows publish configuration to be fluently extended without adding overloads
///
/// e.g.
/// x => x.WithTopic("*.brighton").WithPriority(2)
/// </summary>
public interface IPublishConfiguration
{
/// <summary>
/// Sets a priority of the message
/// </summary>
/// <param name="priority">The priority to set</param>
/// <returns>Returns a reference to itself</returns>
IPublishConfiguration WithPriority(byte priority);
/// <summary>
/// Sets a topic for the message
/// </summary>
/// <param name="topic">The topic to set</param>
/// <returns>Returns a reference to itself</returns>
IPublishConfiguration WithTopic(string topic);
From a functional programming perspective, this is a confusing way to do things, but such is life in the C# world, I suppose.