Search code examples
goffmpeg

closing goroutine spawned by fiber endpoint


I have a program that is rtsp cameras to hls format using ffmpeg for streaming. creating goroutines for each rtsp link as ffmpeg runs in background

Streams are added by following code.

func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
    ctx, _ := context.WithCancel(ctx)
    go func() {
        if !getStreams(v.camera_id) {
            var stream StreamState
            stream.camera_id = v.camera_id
            stream.state = true
            go Stream(v, ctx)
            wg.Wait()
        } else {
            return
        }
    }()
}   

}

Streaming function which runs ffmpeg command.

func Stream(meta StreamData, ctx context.Context) error {
    log.Println("Started Streaming")
    ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
    output, _ := ffmpegCmd.CombinedOutput()

    log.Println(string(output))

    for {
        select {
        case <-ctx.Done():
           log.Println("killing process")
           ffmpegCmd.Process.Kill()
           return nil
        }
    }}

my Goal is to stop each os.exec process (ffmpeg command) or at least close all goroutines that are under ffmpeg commands without closing fiber server.

** help required new to golang **


Solution

  • This is working code:

    func StreamProcess(data <-chan StreamData, ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    for {
        select {
        case v, ok := <-data:
            if ok {
                go func() {
                    if !getStreams(v.camera_id) {
                        var stream StreamState
                        stream.camera_id = v.camera_id
                        stream.state = true
                        go Stream(v, ctx)
                    }
                }()
            } else if !ok {
                cancel()
                return
            }
        case <-ctx.Done():
            log.Println("closed ctx")
            cancel()
        }
    
    }
    

    and started streaming with:

    func Stream(meta StreamData, ctx context.Context) error {
    log.Println("Started Streaming")
    err := exec.CommandContext(ctx, "ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id)).Run()
    
    if err != nil {
        log.Println("error in streaming", err)
        return err
    }
    
    log.Println(string("waiting for closure"))
    for {
        select {
        case <-ctx.Done():
            log.Println("killing process")          
            return nil
        case <-time.After(2* time.second):
            log.Println("started default context")
            return nil
        }
    
    }
    

    .

    This works for me now i didn't found any better way. Please comment if anyone is having better way.