Search code examples
goconcurrencyparallel-processingpriority-queuegoroutine

How to create a shared queue in Go?


I am trying to implement the least connections algorithm for a load balancer. I am using priority queue to keep the count of connections per server in a sorted order. Here is the code:

server = spq[0]
serverNumber = server.value

updatedPriority = server.priority + 1 // Increment connection count for server

spq.update(server, serverNumber, updatedPriority)

targetUrl, err := url.Parse(configuration.Servers[serverNumber])
if err != nil {
    log.Fatal(err)
}

// Send the request to the selected server
httputil.NewSingleHostReverseProxy(targetUrl).ServeHTTP(w, r)

updatedPriority = server.priority - 1 // Decrement connection count for server
spq.update(server, serverNumber, updatedPriority)

where spq is my priority queue.

This code will run for every request the balancer will receive. But I am not getting correct results after logging the state of queue for every request. For example in one case I saw the queue contained the same server twice with different priorities.

I am sure this has something to do with synchronising and locking the queue across the requests. But I am not sure what is the correct approach in this particular case.


Solution

  • If this is really your code that runs in multiple goroutines, then you clearly have race.

    I do not understand spq.update. At first it looks like it is a function that reorders the queue to have the server with minimum number of calls at element 0, but then why does it need both server and serverNumber? serverNumber appears to be a unique ID for the server, and since you already have the server, why do you need that?

    In any case, you should have a sync.Mutex shared by all goroutines, and lock the mutex before the first line, and unlock after spq.update, also you should again lock it after proxy call, and unlock when all done. The line that subtracts 1 from server.priority will only work if server is a pointer. If it is not a pointer, you're losing all the updates to server happened during the call.