Search code examples
gomutexetcd

etcd mutex lock with ttl


i'm trying to create a simple demo golang etcd client program, which uses etcd mutex to create a shared lock, with timeout. The goal is to have the mutex expire after some period of time.

package main

import (
    "context"
    "log"
    "time"

    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/clientv3/concurrency"
)

var c chan int

func init() {
    c = make(chan int)
}

func main() {
    client, err := clientv3.New(clientv3.Config{
        Endpoints: []string{"http://localhost:2379"},
    })
    if err != nil {
        panic(err)
    }

    watcher := clientv3.NewWatcher(client)
    channel := watcher.Watch(context.Background(), "/foobar", clientv3.WithPrefix())
    go func() {
        for {
            select {
            case change := <-channel:
                for _, ev := range change.Events {
                    log.Printf("etcd change on key; %s, type = %v", string(ev.Kv.Key), ev.Type)
                }
            }
        }
    }()

    go lockFoobar(client, 1)
    go lockFoobar(client, 2)
    <-c
    <-c
}

func lockFoobar(client *clientv3.Client, id int) {
    res, err := client.Grant(context.Background(), 1)
    if err != nil {
        panic(err)
    }

    session, err := concurrency.NewSession(client, concurrency.WithLease(res.ID))
    if err != nil {
        panic(err)
    }

    mux := concurrency.NewMutex(session, "/foobar")

    log.Printf("trying to lock by #%d\n", id)
    ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
    if err := mux.Lock(ctx); err != nil {
        log.Printf("failed to lock #%d: %v\n", id, err)
        c <- id
        return
    }

    log.Printf("post-lock #%d (lease ID = %x) bullshit\n", id, res.ID)
    time.Sleep(10 * time.Second)
    ttl, _ := client.TimeToLive(context.TODO(), res.ID)
    log.Printf("post-post-lock-#%d-sleep. lease ttl = %v", id, ttl.TTL)
    // mux.Unlock(ctx)
    // log.Printf("post-unlock #%d bullshit\n", id)

    time.Sleep(200 * time.Millisecond)
    c <- id
}

Lease has ttl of 1 second, while context has timeout of 5 seconds, therefore, the lock should've been deleted by the time the context expires. However the "locked" lock is always deleted only after the the failed lock, no matter the context timeout.

This is the current output:

2018-10-04 18:39:59.413274 I | trying to lock by #2
2018-10-04 18:39:59.414530 I | trying to lock by #1
2018-10-04 18:39:59.414656 I | etcd change on key; /foobar/2a0966398d0677a2, type = PUT
2018-10-04 18:39:59.414684 I | post-lock #2 (lease ID = 2a0966398d0677a2) bullshit
2018-10-04 18:39:59.415617 I | etcd change on key; /foobar/2a0966398d0677a4, type = PUT
2018-10-04 18:40:10.239045 I | post-post-lock-#2-sleep. lease ttl = 1                       <-- lock for #2 has ttl = 1 even after 10s
2018-10-04 18:40:15.238871 I | failed to lock #1: context deadline exceeded                 <-- lock for #1 fails after 15s

As you can see, the lock for #2 is still alive even after 15s.

Running ETCDCTL_API=3 etcdctl watch --prefix=true /foobar in another terminal to watch the keys' changes shows the following output

PUT
/foobar/2a0966398d0677a2

PUT
/foobar/2a0966398d0677a4

DELETE
/foobar/2a0966398d0677a4

DELETE
/foobar/2a0966398d0677a2

Is this the intended behavior? Is there a way to accomplish what I'm trying to?

P.S.: Real-world use case is to create a program, which runs in multiple instances and doesn't leave locks in etcd on crash and/or kill (SIGKILL).


Solution

  • After some search I found the reason for this behavior. The session keeps the lease alive until an error or cancelation.

    From session.go:

    ...
    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            // eat messages until keep alive channel closes
        }
    }()
    ...
    

    Callint session.Orphan() after creating the mutex will stop the session from being kept-alive and serves my purpose.