I'm listening for messages on UDP. We've got devices that announce themselves this way. They also say when they will send the next announcement. If this does not happen we assume a device is gone.
I'd like to make a list of devices that are currently in the network. I'd like to add new devices and remove those that I haven't heard from.
Here is what I've got so far.
1) I've got an in memory db which holds all the devices.
func NewDB() *DB {
return &DB{
table: make(map[string]Announcement),
}
}
type DB struct {
mutex sync.Mutex
table map[string]Announcement
}
func (db *DB) Set(ip string, ann Announcement) {
db.mutex.Lock()
defer db.mutex.Unlock()
db.table[ip] = ann
}
func (db *DB) Delete(ip string) {
db.mutex.Lock()
defer db.mutex.Unlock()
delete(db.table, ip)
}
func (db *DB) Snapshot() map[string]Announcement {
db.mutex.Lock()
defer db.mutex.Unlock()
return db.table
}
2) I've got web server that serves this db to my JavaScript frontend
http.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(db.Snapshot())
})
// start server
go func() {
log.Fatal(http.ListenAndServe(":8085", nil))
}()
3) And finally I'm listening for UDP messages. Whenever a new device is added to the db I also create a new timer with the provided timeout (here I just set it to 10 seconds). When a new messages arrives I check for an existing timer, stop it when it exists and start it again to clear the device if it doesn't send messages anymore.
However I doesn't really work. The AfterFunc
is called way to often. Although the device is still in the network it is removed from my db
. Any ideas?
// some global variable
var (
timers = map[string]*time.Timer{}
)
for {
// create new buffer
b := make([]byte, 1500)
// read message from udp into buffer
n, src, err := conn.ReadFromUDP(b)
if err != nil {
panic(err)
}
// convert raw json bytes to struct
var ann Announcement
if err := json.Unmarshal(b[:n], &ann); err != nil {
panic(err)
}
// add announcement to db
ip := src.IP.String()
db.Set(ip, ann)
// check for existing timer
timer, ok := timers[ip]
if ok {
log.Println("stopping timer", ip)
// stop existing timer
timer.Stop()
}
// start new timer for device
timer = time.AfterFunc(time.Second*10, func() {
log.Println("time after func", ip)
delete(timers, ip)
db.Delete(ip)
})
// store timer in timers db
timers[ip] = timer
time.Sleep(250 * time.Millisecond)
}
I think the problem you are having may be connected with the value of ip
variable you are capturing in AfterFunc
func
.
timer = time.AfterFunc(time.Second*10, func() {
log.Println("time after func", ip)
delete(timers, ip)
db.Delete(ip)
})
From this code, delete on ip
will be invoked with the value of ip
variable at the moment this timer expires. So if in the meanwhile you received a packet from another device with different IP, its this one that will be deleted.
Example of what's happening:
ip = 1.2.3.4
, AfterFunc
is invoked, 10 second timer is startedip = 4.5.6.7
, AfterFunc
is invoked, 10 second timer is startedip
variable is invoked, device 4.5.6.7
is deleted4.5.6.7
againAs a result, device with IP 1.2.3.4
never gets deleted.
You can fix that by creating a function that takes an argument and returns func() with current value of argument.
timer = time.AfterFunc(time.Second*10, func(ip string) func() {
return func() {
log.Println("time after func", ip)
delete(timers, ip)
db.Delete(ip)
}
}(ip))
Simpler working example:
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("Capturing value of i at the moment of execution of func()")
for i := 0; i < 5; i++ {
afterFuncTimer := time.AfterFunc(time.Second*2, func() {
fmt.Printf("AfterFunc() with %v\n", i)
})
defer afterFuncTimer.Stop()
}
time.Sleep(5 * time.Second)
fmt.Println("Capturing value of i from the loop")
for i := 0; i < 5; i++ {
afterFuncTimer := time.AfterFunc(time.Second*2, func(i int) func() {
return func() {
fmt.Printf("AfterFunc() with %v\n", i)
}
}(i))
defer afterFuncTimer.Stop()
}
time.Sleep(5 * time.Second)
}
Run it on Go Playground: https://play.golang.org/p/bGWzTaWe3ZU