Search code examples
f#reactive-programmingmailboxprocessor

Waiting for database rows to load using TableDependency and F#


I've got an F# project that loads some files to an outside subsystem and then uses Table Dependency to wait for some rows to be added to a table as a side effect.

Table Dependency is used in the type below to watch for the db changes. It fires a custom event when a row is added/changed/whatever:

// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() = 
    let mutable someId = ""

    // this property name matches the name of the table column (SomeId)
    member this.SomeId 
        with get () = someId
        and set (value) = someId <- value

// AccountLoadWatcher
type AccountLoadWatcher() = 
    let mutable _tableDependency = null
    let event = new Event<_>()

    interface IDisposable with
        member this.Dispose() = 
            _tableDependency.Stop()
            _tableDependency.Dispose()

    // custom event we can send when an account is loaded

    [<CLIEvent>]
    member this.AccountLoaded = event.Publish

    member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) = 
        let accountLoaded = sender.Entity
        event.Trigger(accountLoaded.SomeId)

    member this.Watch() = 
        _tableDependency <- DbLib.getTableDependency "dbo" "AccountTable" 
                                null
        _tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
        _tableDependency.Start()

What I want to do is take the above object and just wait for all the rows with ids I care about to be loaded. What I have so far is:

let waitForRows(csvFileRows) =
  let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows

  let mutable collected = Set.empty
  let isInSet id = Set.contains id idsToWaitFor
  let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
  let accountLoadedHandler id = 
      collected <- collected.Add id
      printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
  loadToSubsystem csvFileRows |> ignore

  // wait for all the watcher events; filtering each event object for ids we care about
     watcher.AccountLoaded
           |> Observable.takeWhile (fun _ -> notDone) 
           |> Observable.filter (fun e -> isInSet e) 
           |> Observable.subscribe accountLoadedHandler
           |> ignore

  doMoreWork()

but that just continues to doMoreWork without waiting for all the events i need above.

Do I need to use a task or async? F# Agents?


Solution

  • Given that you are using Observable.takeWhile in your example, I'm assuming that you are using the FSharp.Control.Reactive wrapper to get access to the full range of reactive combinators.

    Your approach has some good ideas, such as using takeWhile to wait until you collect all IDs, but the use of mutation is quite unfortunate - it might not even be safe to do this because of possible race conditions.

    A nice alternative is to use one of the various scan function to collect a state as the events happen. You can use Observable.scanInit to start with an empty set and add all IDs; followed by Observable.takeWhile to keep accepting events until you have all the IDs you're waiting for. To actually wait (and block), you can use Observable.wait. Something like this:

    let waitForRows(csvFileRows) =
      let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
    
      let finalCollectedIDs = 
        watcher.AccountLoaded
        |> Observable.scanInit Set.empty (fun collected id -> Set.add id collected) 
        |> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
        |> Observable.wait
    
      printfn "Completed. Final collected IDs are: %A" finalCollectedIDs