I've got an F# project that loads some files to an outside subsystem and then uses Table Dependency to wait for some rows to be added to a table as a side effect.
Table Dependency is used in the type below to watch for the db changes. It fires a custom event when a row is added/changed/whatever:
// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() =
let mutable someId = ""
// this property name matches the name of the table column (SomeId)
member this.SomeId
with get () = someId
and set (value) = someId <- value
// AccountLoadWatcher
type AccountLoadWatcher() =
let mutable _tableDependency = null
let event = new Event<_>()
interface IDisposable with
member this.Dispose() =
_tableDependency.Stop()
_tableDependency.Dispose()
// custom event we can send when an account is loaded
[<CLIEvent>]
member this.AccountLoaded = event.Publish
member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) =
let accountLoaded = sender.Entity
event.Trigger(accountLoaded.SomeId)
member this.Watch() =
_tableDependency <- DbLib.getTableDependency "dbo" "AccountTable"
null
_tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
_tableDependency.Start()
What I want to do is take the above object and just wait for all the rows with ids I care about to be loaded. What I have so far is:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let mutable collected = Set.empty
let isInSet id = Set.contains id idsToWaitFor
let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
let accountLoadedHandler id =
collected <- collected.Add id
printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
loadToSubsystem csvFileRows |> ignore
// wait for all the watcher events; filtering each event object for ids we care about
watcher.AccountLoaded
|> Observable.takeWhile (fun _ -> notDone)
|> Observable.filter (fun e -> isInSet e)
|> Observable.subscribe accountLoadedHandler
|> ignore
doMoreWork()
but that just continues to doMoreWork without waiting for all the events i need above.
Do I need to use a task or async? F# Agents?
Given that you are using Observable.takeWhile
in your example, I'm assuming that you are using the FSharp.Control.Reactive wrapper to get access to the full range of reactive combinators.
Your approach has some good ideas, such as using takeWhile
to wait until you collect all IDs, but the use of mutation is quite unfortunate - it might not even be safe to do this because of possible race conditions.
A nice alternative is to use one of the various scan
function to collect a state as the events happen. You can use Observable.scanInit
to start with an empty set and add all IDs; followed by Observable.takeWhile
to keep accepting events until you have all the IDs you're waiting for. To actually wait (and block), you can use Observable.wait
. Something like this:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let finalCollectedIDs =
watcher.AccountLoaded
|> Observable.scanInit Set.empty (fun collected id -> Set.add id collected)
|> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
|> Observable.wait
printfn "Completed. Final collected IDs are: %A" finalCollectedIDs