Search code examples
pythonconcurrencytransactionsneo4jpy2neo

How to perform an atomic update on relationship properties with Py2neo


I'm trying to update some properties in a Neo4j graph using Py2neo. I'd like to perform an atomic counter increment, then update some other properties only if the counter is a certain value. I also need to do this in a thread safe way so that other agents can't mess up the increment, or overwrite the property values before the entire set of values is committed. I believe it will look something like this:

tx = graph.cypher.begin()
try:
    tx.acquireWriteLock(relationship) # from Java, not sure what this is in py2neo
    count = relationship.getProperty("count", 1);
    relationship.setProperty("count", count+1 );
    if count == threshold:
        relationship.setProperty("another_property", some_value );
    tx.success();
finally:
    tx.finish();

Some of the code above is guessed or taken from Java examples, If anyone could help with the python equivilant or point me in the direction of some example code that does the same I'd really appreciate it.


Solution

  • I ultimately found a cleaner more thread safe way to accomplish my requirements using Cypher. Thanks to the neo4j-users Slack channel I was informed that locks need to be acquired to avoid 'lost updates' in concurrent use cases like this. Cypher has no explicit ability to lock a node, however adding or in fact removing a non existent property will cause Cypher to lock the nodes for the update. This is discussed here: http://neo4j.com/docs/stable/transactions-isolation.html

    My Cypher query ended up looking like this:

    MATCH (a:Person {name: 'a'}), (b:Person {name: 'b'})
    REMOVE a.__notexisitng__, b.__notexisiting__
    WITH a,b
    MATCH (a)-[r:KNOWS]-(b)
    SET r.count = r.count + 1
    WHERE r.count = threshold 
    SET r.another_property = 'some value'
    

    To run this in py2neo, the following code does the job:

    statement = "cypher query above ^"
    parameters = {}
    response = graph.cypher.post(statement, parameters)