Search code examples
c++ccouchbasepessimistic-locking

How to use "pessimistic" Couchbase locking with the C API?


This is incredibly frustrating... I'm using the Couchbase v3 C API, and I ran into a problem that would be solved perfectly by using their "pessimistic" locking. After much effort I believe I've finally figured out how to lock records using it (with lcb_cmdget_locktime(), which presumably takes a parameter in microseconds), but I can't figure out how to unlock the record once I'm done with it, other than by letting it time out, which isn't an acceptable solution.

The documentation provides one example, here, but for some unknown reason it's in Javascript instead of C (!!!), and the concepts don't map to the C API.

Does anyone know how to unlock a pessimistic lock, or have any example C/C++ code using that API? Barring that, does anyone know where to find the source code for any of the non-C APIs that use the C one, since I should be able to work it out from there? (I haven't been able to locate any of those either, not sure they're open-source.)


Solution

  • Thank you for question

    Time in seconds, note that the server might reset time to default, if it larger than maximum time (both durations are configurable). The following command will help to discover effective values for the feature.

    $ cbstats -u Administrator -p password  localhost all | grep ep_getl
    ep_getl_default_timeout:                               15
    ep_getl_max_timeout:                                   30
    

    To lock the key, one must use get operation with and set lock time using lcb_cmdget_locktime and capture CAS value in case of successful lock, for instance like this

    struct my_result {
        lcb_STATUS status{LCB_SUCCESS};
        uint64_t cas{0};
    };
    
    static void get_callback(lcb_INSTANCE *instance, lcb_CALLBACK_TYPE, const lcb_RESPGET *resp)
    {
        my_result *res = nullptr;
        lcb_respget_cookie(resp, (void **)&res);
        res->status = lcb_respget_status(resp);
        if (res->status == LCB_SUCCESS) {
            lcb_respget_cas(resp, &res->cas);
        }
    }
    

    It is good idea to put get with lock code into loop

    uint64_t locked_cas{0};
    int retries = 3;
    while (retries > 0) {
        std::string document_id{"foo"};
        my_result result{};
        lcb_CMDGET* cmd = nullptr;
        lcb_cmdget_create(&cmd);
        lcb_cmdget_key(cmd, document_id.c_str(), document_id.size());
        lcb_cmdget_locktime(cmd, 5);
        lcb_get(instance, &result, cmd);
        lcb_cmdget_destroy(cmd);
        lcb_wait(instance, LCB_WAIT_DEFAULT);
    
        if (result.rc == LCB_SUCCESS) {
            locked_cas = result.cas;
            break;
        } else if (result.rc == LCB_ERR_DOCUMENT_LOCKED || result.rc == LCB_ERR_TEMPORARY_FAILURE) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            --retries;
            continue;
        } else {
            std::cerr << "Unexpected issue during get with lock: " << lcb_strerror_short(result.rc) << "\n";
            break;
        }
    }
    

    Once the key got locked, the update operation might be performed, but keep in mind that it MUST use locked_cas, otherwise mutation operation will fail.

    std::string document_id{"foo"};
    std::string new_value{"new value"};
    lcb_CMDSTORE* cmd = nullptr;
    lcb_cmdstore_create(&cmd, LCB_STORE_REPLACE);
    lcb_cmdstore_key(cmd, document_id.c_str(), document_id.size());
    lcb_cmdstore_value(cmd, new_value.c_str(), new_value.size());
    lcb_cmdstore_cas(cmd, locked_cas);
    lcb_store(instance, nullptr, cmd);
    lcb_cmdstore_destroy(cmd);
    lcb_wait(instance, LCB_WAIT_DEFAULT);
    

    To unlock key you also need locked_cas, or just wait until server will unlock the document automatically.

    std::string document_id{"foo"};
    lcb_CMDUNLOCK *cmd = nullptr;
    lcb_cmdunlock_create(&cmd);
    lcb_cmdunlock_key(cmd, document_id.c_str(), document_id.size());
    lcb_cmdunlock_cas(cmd, locked_cas);
    lcb_unlock(instance, nullptr, cmd);
    lcb_cmdunlock_destroy(cmd);
    lcb_wait(instance, LCB_WAIT_DEFAULT);