Search code examples
goerror-handlingchannelgoroutineticker

Use ticker to periodically load all the files in memory from a path which keeps changing frequently?


I have an application which needs to read files from two different path. After reading all these files, I need to load them up in memory in products map.

Path:

  • Full: This is the path which will have all files that we need to load up during server startup in memory. This path will have around 50 files and each file size is ~60MB.
  • Delta: This is the path which will have all the delta files that we need to load up in memory periodically every 1 minute. These files will only contain difference from the full path files. This path will have around 60 files and each file size is ~20MB.

Below code watchDeltaPath is called during server startup to watch for delta changes. It will get the delta path from GetDeltaPath method and from that path I need to load all the files in memory. This delta path keeps changing every few minutes and I cannot miss any one delta path and all the files in that path.

Loading all files in memory from loadAllFiles method can take some time (approx 5mins) so I am trying to find a way where I should not miss any new delta path (as it keeps changing every few minutes) and should be able to load all those files in memory from the delta path again and again periodically without any issue and efficiently.

I got the below code which runs every 1 minute and look for new delta path every time and then load all the files from that path in the memory. It works fine but I don't think this is the right approach to do it. What happens if loadAllFiles method takes more than 10 minutes to load all the files in memory and my ticker is running every 1 minute to look for new delta path and then find all the files in that new path and then load up in memory? Will it keep creating lot of background threads and maybe increase cpu-usage by a lot?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap
}

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
    ticker := time.NewTicker(1 * time.Minute)
    go func() {
        select {
        case <-r.done:
            ticker.Stop()
            return
        case <-ticker.C:
            func() (result error) {
                trans := r.logger.StartTransaction(nil, "delta-changes", "")
                defer trans.End()
                defer func() {
                    if result != nil {
                        trans.Errorf("Recovered from error: %v")
                    } else if err := recover(); err != nil {
                        trans.Errorf("Recovered from panic: %v", err)
                    }
                }()
                // get latest delta path everytime as it keeps changing every few minutes
                path, err := r.client.GetDeltaPath("delta")
                if err != nil {
                    return err
                }
                // load all the files in memory in "products" map from that path
                err = r.loadAllFiles(path)
                if err != nil {
                    return err
                }
                return nil
            }()
        }
    }()
    return nil
}

func (r *applicationRepository) Stop() {
    r.done <- struct{}{}
}

What is the best way to do this efficiently in prod?

Here is my play with code on how it is being executed - https://go.dev/play/p/FS4-B0FWwTe


Solution

  • As per the comments the "best way to do this efficiently in prod" depends on a lot of factors and is probably not answerable on a site like Stack overflow. Having said that I can suggest an approach that might make it easier to think about how the problem could be best solved.

    The below code (playground; pretty rough and untested) demonstrates an approach with three go routines:

    1. Detects new delta paths and pushes them to a buffered channel
    2. Handles the initial load
    3. Waits for initial load to finish then applies deltas (note that this does process deltas found while the initial load is underway)

    As mentioned above there is insufficient detail in the question to ascertain whether this a good approach. It may be that the initial load and deltas can run simultaneously without saturating the IO but that would require testing (and would be a relatively small change).

    // Simulation of process to perform initial load and handle deltas
    package main
    
    import (
        "fmt"
        "strconv"
        "sync"
        "time"
    )
    
    const deltaBuffer = 100
    const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
    const deltaCheckFrequency = time.Duration(500 * time.Millisecond)
    
    func main() {
        ar := NewApplicationRepository()
        time.Sleep(5 * time.Second)
        ar.Stop()
        fmt.Println(time.Now(), "complete")
    }
    
    type applicationRepository struct {
        deltaChan       chan string   // Could be some other type...
        initialLoadDone chan struct{} // Closed when initial load finished
    
        done chan struct{}
        wg   sync.WaitGroup
    }
    
    func NewApplicationRepository() *applicationRepository {
        ar := applicationRepository{
            deltaChan:       make(chan string, deltaBuffer),
            initialLoadDone: make(chan struct{}),
            done:            make(chan struct{}),
        }
    
        ar.wg.Add(3)
        go ar.detectNewDeltas()
        go ar.initialLoad()
        go ar.deltaLoad()
    
        return &ar
    }
    
    // detectNewDeltas - watch for new delta paths
    func (a *applicationRepository) detectNewDeltas() {
        defer a.wg.Done()
        var previousDelta string
        for {
            select {
            case <-time.After(deltaCheckFrequency):
                dp := a.getDeltaPath()
                if dp != previousDelta {
                    select {
                    case a.deltaChan <- dp:
                    default:
                        panic("channel full - no idea what to do here!")
                    }
                    previousDelta = dp
                }
            case <-a.done:
                return
            }
        }
    }
    
    // getDeltaPath in real application this will retrieve the delta path
    func (a *applicationRepository) getDeltaPath() string {
        return strconv.Itoa(time.Now().Second()) // For now just return the current second..
    }
    
    // initialLoad - load the initial data
    func (a *applicationRepository) initialLoad() {
        defer a.wg.Done()
        defer close(a.initialLoadDone)
        time.Sleep(initialLoadTime) // Simulate time taken for initial load
    }
    
    // deltaLoad- load deltas found by detectNewDeltas
    func (a *applicationRepository) deltaLoad() {
        defer a.wg.Done()
        fmt.Println(time.Now(), "deltaLoad started")
    
        // Wait for initial load to complete before doing anything
        <-a.initialLoadDone
        fmt.Println(time.Now(), "Initial Load Done")
    
        // Wait for incoming deltas and load them
        for {
            select {
            case newDelta := <-a.deltaChan:
                fmt.Println(time.Now(), newDelta)
            case <-a.done:
                return
            }
        }
    }
    
    // Stop - signal loader to stop and wait until this is done
    func (a *applicationRepository) Stop() {
        close(a.done)
        a.wg.Wait()
    }