Search code examples
goudpgoroutine

Start multiple timers on UDP message with cancellation


I'm listening for messages on UDP. We've got devices that announce themselves this way. They also say when they will send the next announcement. If this does not happen we assume a device is gone.

I'd like to make a list of devices that are currently in the network. I'd like to add new devices and remove those that I haven't heard from.

Here is what I've got so far.

1) I've got an in memory db which holds all the devices.

func NewDB() *DB {
    return &DB{
        table: make(map[string]Announcement),
    }
}

type DB struct {
    mutex sync.Mutex
    table map[string]Announcement
}

func (db *DB) Set(ip string, ann Announcement) {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    db.table[ip] = ann
}

func (db *DB) Delete(ip string) {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    delete(db.table, ip)
}

func (db *DB) Snapshot() map[string]Announcement {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    return db.table
}

2) I've got web server that serves this db to my JavaScript frontend

http.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(db.Snapshot())
})

// start server
go func() {
    log.Fatal(http.ListenAndServe(":8085", nil))
}()

3) And finally I'm listening for UDP messages. Whenever a new device is added to the db I also create a new timer with the provided timeout (here I just set it to 10 seconds). When a new messages arrives I check for an existing timer, stop it when it exists and start it again to clear the device if it doesn't send messages anymore.

However I doesn't really work. The AfterFunc is called way to often. Although the device is still in the network it is removed from my db. Any ideas?

// some global variable
var (
    timers = map[string]*time.Timer{}
)

for {
    // create new buffer
    b := make([]byte, 1500)

    // read message from udp into buffer
    n, src, err := conn.ReadFromUDP(b)
    if err != nil {
        panic(err)
    }

    // convert raw json bytes to struct
    var ann Announcement
    if err := json.Unmarshal(b[:n], &ann); err != nil {
        panic(err)
    }

    // add announcement to db
    ip := src.IP.String()
    db.Set(ip, ann)

    // check for existing timer
    timer, ok := timers[ip]
    if ok {
        log.Println("stopping timer", ip)
        // stop existing timer
        timer.Stop()
    }

    // start new timer for device
    timer = time.AfterFunc(time.Second*10, func() {
        log.Println("time after func", ip)
        delete(timers, ip)
        db.Delete(ip)
    })

    // store timer in timers db
    timers[ip] = timer

    time.Sleep(250 * time.Millisecond)
}

Solution

  • I think the problem you are having may be connected with the value of ip variable you are capturing in AfterFunc func.

    timer = time.AfterFunc(time.Second*10, func() {
            log.Println("time after func", ip)
            delete(timers, ip)
            db.Delete(ip)
        })
    

    From this code, delete on ip will be invoked with the value of ip variable at the moment this timer expires. So if in the meanwhile you received a packet from another device with different IP, its this one that will be deleted.

    Example of what's happening:

    • second 1: Device with IP 1.2.3.4 sends UDP packet. ip = 1.2.3.4, AfterFunc is invoked, 10 second timer is started
    • second 3: Device with IP 4.5.6.7 sends UDP packet. Now ip = 4.5.6.7, AfterFunc is invoked, 10 second timer is started
    • second 10: function that deletes current value of ip variable is invoked, device 4.5.6.7 is deleted
    • second 13: second timer times out, and we try to delete 4.5.6.7 again

    As a result, device with IP 1.2.3.4 never gets deleted.

    You can fix that by creating a function that takes an argument and returns func() with current value of argument.

    timer = time.AfterFunc(time.Second*10, func(ip string) func() {
        return func() {
            log.Println("time after func", ip)
            delete(timers, ip)
            db.Delete(ip)
        }   
    }(ip))
    

    Simpler working example:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        fmt.Println("Capturing value of i at the moment of execution of func()")
        for i := 0; i < 5; i++ {
    
            afterFuncTimer := time.AfterFunc(time.Second*2, func() {
                fmt.Printf("AfterFunc() with %v\n", i)
            })
    
            defer afterFuncTimer.Stop()
        }
    
        time.Sleep(5 * time.Second)
    
        fmt.Println("Capturing value of i from the loop")
        for i := 0; i < 5; i++ {
    
            afterFuncTimer := time.AfterFunc(time.Second*2, func(i int) func() {
                return func() {
                    fmt.Printf("AfterFunc() with %v\n", i)
                }
            }(i))
    
            defer afterFuncTimer.Stop()
        }
    
        time.Sleep(5 * time.Second)
    
    }
    

    Run it on Go Playground: https://play.golang.org/p/bGWzTaWe3ZU