I have a program that is rtsp cameras to hls format using ffmpeg for streaming. creating goroutines for each rtsp link as ffmpeg runs in background
Streams are added by following code.
func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
ctx, _ := context.WithCancel(ctx)
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
wg.Wait()
} else {
return
}
}()
}
}
Streaming function which runs ffmpeg command.
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
output, _ := ffmpegCmd.CombinedOutput()
log.Println(string(output))
for {
select {
case <-ctx.Done():
log.Println("killing process")
ffmpegCmd.Process.Kill()
return nil
}
}}
my Goal is to stop each os.exec process (ffmpeg command) or at least close all goroutines that are under ffmpeg commands without closing fiber server.
** help required new to golang **
This is working code:
func StreamProcess(data <-chan StreamData, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case v, ok := <-data:
if ok {
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
}
}()
} else if !ok {
cancel()
return
}
case <-ctx.Done():
log.Println("closed ctx")
cancel()
}
}
and started streaming with:
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
err := exec.CommandContext(ctx, "ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id)).Run()
if err != nil {
log.Println("error in streaming", err)
return err
}
log.Println(string("waiting for closure"))
for {
select {
case <-ctx.Done():
log.Println("killing process")
return nil
case <-time.After(2* time.second):
log.Println("started default context")
return nil
}
}
.
This works for me now i didn't found any better way. Please comment if anyone is having better way.