Search code examples
twitterf#tweetinvi

Observable vs FSharpx asyncSeq


I have the following tweet stream class. It has the TweetReceived event which can be used with the other components of my system.

It seems to work ok but I have the feeling that it's more complicated than it should be.

Are there any tools out there to give me this functionality without having to implement the mbox/event mechanism by myself?

Also would you recommend to use asyncSeq instead of IObservable?

Thanks!

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let event = new Event<_>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 

    member x.TweetReceived = event.Publish

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agent.Post))
        |> ignore

    member x.Stop = stream.StopStream

UPDATE: Thanks Thomas for the quick (as always) answer to the second question.

My first question may be a little unclear so I refactored the code to make the class AgentEvent visible and I rephrase the first question: is there a way to implement the logic in AgentEvent easier? Is this logic implemenented already in some place?

I'm asking this because it feels like a common usage pattern.

type AgentEvent<'t>()=
    let event = new Event<'t>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 
    member x.Event = event.Publish
    member x.Post = agent.Post

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let agentEvent = AgentEvent()

    member x.TweetReceived = agentEvent.Event

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agentEvent.Post))
        |> ignore

    member x.Stop = stream.StopStream

Solution

  • I think that IObservable is the right abstraction for publishing the events. As for processing them, I would use either Reactive Extensions or F# Agents (MailboxProcessor), depending on what you want to do.

    Note that F# automatically represents events as IObservable values (actually IEvent, but that inherits from observable), so you can use Reactive Extensions directly on TweetReceived.

    What is the right representation?

    • The main point of asyncSeq is that it lets you control how quickly the data is generated - it is like async in that you have to start it to actually do the work and get a value - so this is useful if you can start some operation (e.g. download next few bytes) to get the next value

    • IObservable is useful when you have no control over the data source - when it just keeps producing values and you have no way to pause it - this seems more appropriate for tweets.

    As for processing, I think that Reactive Extensions are nice when they already implement the operations you need. When you need to write some custom logic (that is not easily expressed in Rx), using Agent is a great way to write your own Rx-like functions.