Search code examples
.netasynchronousf#cancellationtokensource

Cancelling long run api request


I have an issue with managing the CancellationTokenSources related to many long-running database queries.

My app uses Suave.IO as webserver and neo4j as graph db with a Blazor webassembly client.

Using the Blazor virtualised component with an items provider, a user can scroll quickly through a list of data triggering many fairly long-running queries on the graph db when the component makes an httpclient request to the api. The number of requests can overload the database's configured transaction memory.

My attempted solution is to cancel the queries by picking up the token in the async request and closing the session which causes the query to rollback. This part works fine. The issue I'm having is how to manage the CancellationTokenSource associated with a request.

The below is is flawed and won't work as the async nature of requests doesn't play nice with the Map

let mutable private csStore = Map.empty<Common.Types.HashCode, CancellationTokenSource>

let private handleCancel hashCode =
    let makeAndAdd() =
        let newCts = new CancellationTokenSource()
        csStore <- csStore |> Map.add hashCode newCts
        newCts

    match csStore |> Map.tryFind hashCode with
    | Some cts ->
        printfn $"now cancel"
        cts.Cancel()
        makeAndAdd()
    | None -> 
        printfn $"not found"
        makeAndAdd()

//start the task first cancelling if request with given hash code already present
let runner hashCode f =
    let token = (handleCancel hashCode).Token
    Async.StartAsTask(f, cancellationToken = token)
    |> Async.AwaitTask

//clear the store, disposing etc

and called like this;

let tryCancel (id : int) =
        fun ctx ->
            asyncResult {
                let hashCode = HashCode.create id
                let! res = Cancellable.runner hashCode <| (runAQuery id)
                //TODO: dispose cts

                return! SuaveJson.JSON res OK ctx
            } 
            |> handleResultPassthroughError (fun err -> BAD_REQUEST err ctx)

Obviously it would need to include some disposal of CancellationTokenSource.

The hashCode object is created from the parameters. In a real scenario, the request parameters include a user id, node type, sort direction, sort parameter, limit of records to return and finally a skip parameter determining the number of records to skip. The hashCode is created from the items that do not change with multiple requests (i.e. the user id, node type, sort direction and parameter and limit) such that a request should be cancelled when only the skip number changes. If there are 100 records and the page size is 5, a user can rapidly scroll to the end creating 20 simultaneous requests, the last of which is the only one of interest. The theory is that the first 19 are cancelled and the last succeeds. The key part is that only certain requests should be cancelled so I need some kind of mechanism to store the CancellationTokenRequests.

However, the async nature of the requests mean that the store hashCode is not found and all requests execute rather than any or some cancelling. This varies so I presume there is a race condition.

How can I change the code to robustly store the CancellationTokenSources associated with each request and therefore cancel them when a subsequent request is received?

EDIT -------------------

Based on Guro Stron's answer, this is a working example which successfully cancels previous requests and allows the last to succeed. The lock has been extended from the original answer so the answer was correct really, just needed a tweak.

type HashCode = private HashCode of int

module HashCode =
    let create hashCode = HashCode hashCode

module Cancellable =
    let private csStore = ConcurrentDictionary<HashCode, CancellationTokenSource>()

    let private handleCancel hashCode =
        let makeAndAdd() =
            csStore.GetOrAdd(hashCode, (fun _ -> new CancellationTokenSource()))

        lock csStore (fun () ->
            match csStore.TryRemove hashCode with
            | true, cts ->
                cts.Cancel()
                cts.Dispose()
                makeAndAdd()
            | false, _ ->
                makeAndAdd()
            )

    //start the task first cancelling it if already running with the given hash code
    let runner hashCode f =
        let token = (handleCancel hashCode).Token
        Async.StartAsTask(f, cancellationToken = token)
        |> Async.AwaitTask

let runAQuery iter =
    async {

        use! holder = Async.OnCancel(fun _ ->
            printfn $"cancelled iter: {iter}"
            )

        do! Async.Sleep 10000
        return 99
    }

let tryCancel (id : int) (iter : int) =
    fun ctx ->
        async {
            let hashCode = HashCode.create id
            let! res = Cancellable.runner hashCode <| runAQuery iter

            return! OK "some result" ctx
        }

However, I'm unclear why I don't receive a console notification for EVERY cancellation. For example, hitting the tryCancel end point with this code via a script using the Http utilities in FSharp.Data,

let reqs =
    let url iter = $"http://localhost:5022/trycancel/1/{iter}"
    seq {
        for i in [ 1 .. 20 ] do
            async { return Http.Request(url i, httpMethod="GET") }
    }

#time "on"
let res =
    Async.Parallel(reqs)
    |> Async.RunSynchronously
#time "off"

only writes the 'cancelled iter: x' message for some cancellations (3 or 4 typically and the iteration varies).

Does that mean that Async.Cancel(fun ...) is not being called or something else?


Solution

  • Consider using ConcurrentDictionary. Something to start with:

    let private csStore = ConcurrentDictionary<Common.Types.HashCode, CancellationTokenSource>()
    let private lockStore = ConcurrentDictionary<Common.Types.HashCode, Object>()
    
    let private handleCancel hashCode =
        let makeAndAdd() =
            csStore.GetOrAdd(hashCode, new CancellationTokenSource())
    
        let locker = lockStore.GetOrAdd(hashCode, Object())
    
        lock locker (fun () ->
            match csStore.TryRemove hashCode with
            | true, cts ->
                printfn $"now cancel"
                try // better option - add remove method which will lock the locker and remove the CancellationTokenSource if it matches
                    cts.Cancel()
                    cts.Dispose()
                with
                    | :? ObjectDisposedException -> () 
                makeAndAdd ()
            | false, _ ->
                printfn $"not found"
                makeAndAdd ())