Search code examples
httpasynchronousgoapache-kafkagoroutine

Async work after response


I am trying to implement http server that:

  • Calculate farther redirect using some logic
  • Redirect user
  • Log user data

The goal is to achieve maximum throughput (at least 15k rps). In order to do this, I want to save log asynchronously. I'm using kafka as logging system and separate logging block of code into separate goroutine. Overall example of current implementation:

package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "net/http"
    "time"
    "encoding/json"
)

type log struct {
    RuntimeParam  string `json:"runtime_param"`
    AsyncParam    string `json:"async_param"`
    RemoteAddress string `json:"remote_address"`
}

var (
    producer, _ = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092,localhost:9093",
        "queue.buffering.max.ms": 1 * 1000,
        "go.delivery.reports": false,
        "client.id": 1,
    })
    topicName = "log"
)

func main() {
    siteMux := http.NewServeMux()
    siteMux.HandleFunc("/", httpHandler)
    srv := &http.Server{
        Addr: ":8080",
        Handler: siteMux,
        ReadTimeout:  2 * time.Second,
        WriteTimeout: 5 * time.Second,
        IdleTimeout:  10 * time.Second,
    }
    if err := srv.ListenAndServe(); err != nil {
        panic(err)
    }
}

func httpHandler(w http.ResponseWriter, r *http.Request) {
    handlerLog := new(log)
    handlerLog.RuntimeParam = "runtimeDataString"
    http.Redirect(w, r, "http://google.com", 301)
    go func(goroutineLog *log, request *http.Request) {
        goroutineLog.AsyncParam = "asyncDataString"
        goroutineLog.RemoteAddress = r.RemoteAddr
        jsonLog, err := json.Marshal(goroutineLog)
        if err == nil {
            producer.ProduceChannel() <- &kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
                Value:          jsonLog,
            }
        }
    }(handlerLog, r)
}

The questions are:

  1. Is it correct/efficient to use separate goroutine to implement async logging or should I use a different approach? (workers and channels for example)
  2. Maybe there is a way to further improve performance of server, that I'm missing?

Solution

    1. Yes, this is correct and efficient use of a goroutine (as Flimzy pointed in the comments). I couldn't agree more, this is a good approach.

    The problem is that the handler may finish executing before the goroutine started processing everything and the request (which is a pointer) may be gone or you may have some races down the middleware stack. I read your comments, that it isn't your case, but in general, you shouldn't pass a request to a goroutine. As I can see from your code, you're really using only RemoteAddr from the request and why not to redirect straight away and put logging in the defer statement? So, I'd rewrite your handler a bit:

    func httpHandler(w http.ResponseWriter, r *http.Request) {
        http.Redirect(w, r, "http://google.com", 301)
        defer func(runtimeDataString, RemoteAddr string) {
                handlerLog := new(log)
                handlerLog.RuntimeParam = runtimeDataString
                handlerLog.AsyncParam = "asyncDataString"
                handlerLog.RemoteAddress = RemoteAddr
                jsonLog, err := json.Marshal(handlerLog)
                if err == nil {
                    producer.ProduceChannel() <- &kafka.Message{
                        TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
                        Value:          jsonLog,
                    }
                }
            }("runtimeDataString", r.RemoteAddr)
    }  
    
    1. The goroutines unlikely improve performance of your server as you just send the response earlier and those kafka connections could pile up in the background and slow down the whole server. If you find this as the bottleneck, you may consider saving logs locally and sending them to kafka in another process (or pool of workers) outside of your server. This may spread the workload over time (like sending fewer logs when you have more requests and vice versa).