Search code examples
azure-cosmosdbgremlinazure-cosmosdb-gremlinapi

Add or subtract from a property value in CosmosDB Gremlin in a single query


I am trying to figure out how to add or subtract from an existing property (number type) on a Graph node using the Gremlin API and CosmosDb.

Let's say I want to keep track of an inventory count of widgets in a bin:

g.addV('Bin').Property('id', '1').Property('Widgets', 100)

I can get the current value easily:

g.V('1').Properties('Widgets').Value()

And update the value, say if I take a widget from the bin, I would do my math and then set the new value:

g.V('1').Property('Widgets', 99)

The problem is that this may be happening with a high degree of concurrency. There is a strong chance of race conditions where the application gets the current value, then it is changed by another consumer before it can update with the new value.

Currently these steps simply happen as close as possible to each other (literally only the math step in between), but I am now looking at possibly a few dozen concurrent processes all doing this.

I imagined a query something like this:

g.V('1').Properties('Widgets').Value().Store('x').V('1').Property('Widgets', Math('x - 1'))

But unfortunately the math() step is not supported in CosmosDB.

That led me to try passing the result of a traversal in, but I get type errors where CosmosDB complains that the input is a traversal instead of a value, even when (I think?) it should get the value.

Example:

g.V('1').Properties('Widgets').Value().Store('x').V('1').Property('Widgets', Select('x'))

Returns this error:

Failure in submitting query: g.V('1').Properties('Widgets').Value().Store('x').V('1').Property('Widgets', select('x')): Script eval error: ActivityId : 3438873e-2cca-4c65-a221-dc56c4468a5f ExceptionType : GraphRuntimeException ExceptionMessage : Gremlin Query Execution Error: Cannot create ValueField on non-primitive type GraphTraversal. GremlinRequestId : 86242ae1-5e1e-4cf4-8cbc-a68fccf535b0 Context : graphcompute Scope : graphstg-logplan GraphInterOpStatusCode : GraphRuntimeError HResult : 0x80131500

I tried a couple of variations, this one returns the same error:

g.V('1').Properties('Widgets').Value().Store('x').V('1').Property('Widgets', Select('x').Value())

And this one set the value of 'Widgets' to the literal value 'x'....

g.V('1').Properties('Widgets').Value().Store('x').V('1').Property('Widgets', 'x')

There's also the issue of telling the query how many widgets I took, but if the first problem is solved then that follows.

I am thinking I might end up on something like this, where a placeholder property is set and the sum function output passed in as the new value. For example, this traversal/s gives the correct desired value:

g.V('1').Property('Diff', -1).V('1').Properties('Widgets', 'Diff').Value().Sum()

But if you try to pass this in as the new value of the Widgets property it throws the same type error as above (will not accept a traversal):

g.V('1').Property('Diff', -1).V('1').Properties('Widgets', 'Diff').Value().Sum().as('x').V('1').Property('Widgets', select('x'))

Solution

  • So I was not able to get this to work in a single query, but I was able to piece together a concurrency management solution with the help of the docs and some trial and error.

    I'm using the Gremlin.Net package in my application for this, which is not developed by Microsoft, so this contributes somewhat to having to translate between the Apache and Microsoft implementations.

    The docs here describe (very briefly and with no real explanation) how to access the etag data in CosmosDB Gremlin: https://learn.microsoft.com/en-us/azure/cosmos-db/gremlin/access-system-properties#e-tag

    The documentation there is literally a single sentence, and not very helpful, but with some testing I found out that:

    • CosmosDB Gremlin does not give you access to system fields like etag by default, even though the documentation says it does, if you try CosmosDb throws an exception
    • To get access to these fields, you must (every time) pre-pend your query with a traversal strategy that makes the relevant system fields available:
    g.withStrategies(ProjectionStrategy.build().IncludeSystemProperties('_etag').create()).V(... the rest of your query that uses the etag value somewhere)
    

    This may allow the mentioned headers to be passed in these arguments to CosmosDb using the RequestMessage (provided you got the etag in the first place using ProjectionStrategy), however I did not attempt this as the docs state this constraint causes a 412 exception to be thrown, and I did not want to use that to control the flow of something I expect to happen frequently.

    The solution required some refactoring of my query methods, but thankfully these were all private and didn't affect anything else.

    Instead of just getting the value like before, I grab both the current value and the etag of the item, which I wrapped in a WidgetInventory object:

    // The GremlinClient is g
    // Includes dealing with (avoiding?) CosmosDb's persnickety serializer
    
    private static async Task<WidgetInventory> GetWidgetInventoryAsync(string id)
    {
        var parameters = new Dictionary<string, object>() {{"id", id}}    
    
        var rawResult = await g.SubmitWithSingleResultAsync<dynamic>("g.withStrategies(ProjectionStrategy.build().IncludeSystemProperties('_etag').create()).V(id).Project(Property, '_etag').by('Widgets').by('_etag')", parameters);
    
        var result = JsonSerializer.Deserialize<WidgetInventory>(JsonSerializer.Serialize(rawResult));
    
        return result;
    }
    
    public class WidgetInventory
    {
        [JsonPropertyName("Widgets")]
        public int Count { get; set; }
    
        [JsonPropertyName("_etag")]
        public string Etag { get; set; } = String.Empty;
    }
    

    And the subsequent update operation now takes a WidgetInventory as an argument. Using a .has() step to update only if the etag is a match, then returning the new value as a nullable integer let's me do the flow control on a null check (because you don't even have a node if the etag doesn't match, so you get nothing):

    private static async Task<int?> SetWidgetInventoryAsync(string id, int widgets, WidgetInventory etagData)
    {
        var parameters = new Dictionary<string, object>() { { "id", id }, { "Widgets", widgets }, { "etag", etagData.Etag } };
    
        int? result = null;
        
        //Update the value and spit out the same - returns null if no node was matched by the has() step.
        result = await g.SubmitWithSingleResultAsync<int?>("g.withStrategies(ProjectionStrategy.build().IncludeSystemProperties('_etag').create()).V(id).has('_etag', etag).Property('Widgets', widgets).Properties('Widgets').Value()", parameters);
        
        return result;
    }
    

    Usage via public method - the below is simplified to a short recursive function - obviously you need attempt limit of some sort and other error handling (eg: not enough widgets). These things are omitted for brevity:

    public GetWidgetsFromBin(int quantity, string binId)
    {
        var widgets = await GetWidgetInventoryAsync(binId);
    
        // Taking quantity widgets
        var newWidgetsCount = widgets - quantity;
        
        // Updating the inventory, and retrying if it didn't work
        int? result = await SetWidgetInventoryAsync(binId, newWidgetsCount, widgets);
    
        if(result is null)
        {
            GetWidgetsFromBin(quantity, binId);
        }
    }
    

    This solution is working for me.