So I have some kind of event queues and several goroutines which are getting the events from their corresponding queues in an infinite loop, process them, and send results into a channel. Different queues may give you the same event, so I need to make sure that each event is sent to channel exactly once, and any occurence of that message in another queue will be ignored. I believe that's more of an architectural issue but I can't figure out how to handle this properly.
Simplified version of my current code is below.
Goroutines that get and handle incoming events look somewhat like this:
func (q *Queue) ProcessEvents(handler Handler) {
lastEvent = 0
for {
events = getEvents(lastEvent)
for _, e := range events {
if e.ID > lastEvent {
lastEvent = event.ID
}
handler.Handle(e)
}
}
}
Handler:
type Handler struct {
c chan Event
}
func (h *Handler) Handle(event *Event) {
//event processing omitted
h.c <- event //Now it just sends a processed event into the channel no matter what.
}
And in main() I do
func main() {
msgc := make(chan Event)
for _, q := range queues {
go func(queue Queue) {
queue.ProcessEvents(&Handler{msgc})
}
}
}
So you represent your current architecture as follows:
With this type of solution the Generators need to check a shared resource to see if an event was already emitted. This might look something like this:
var hasEmmited map[string]bool
var lock sync.Mutex
func HasEmitted(event e) bool {
lock.Lock()
defer lock.Unlock()
e,ok := hasEmmited[e.ID]
return e && ok
}
func SetEmmited(event e) {
lock.Lock()
defer lock.Unlock()
hasEmmited[e.ID] = true
}
This requires locking/unlocking, which even in the best case scenario with no contention is till a great over-head considering the small amount of work being done in the critical section.
With a small change in the architecture, like in the second diagram, it would be possible for one go-routine to to do the filtering without any locking.
Some commenters have said that designing solutions using go-routines is the same as designing for single-threaded applications. I do not believe this is the case. I would suggest looking at:
Golang related messaging: https://blog.golang.org/pipelines
Some message handling design patterns: http://www.enterpriseintegrationpatterns.com/
The enterprise integration patterns might look out of place here, but it covers a lot of message passing patters that also applies in go.