Search code examples
pythonffmpegpipebufferyoutube-dl

Python buffered IO ending early streaming with multiple pipes


I'm trying to make a continuous livestream of videos downloaded via yt-dlp. I need to port this (working) bash command into Python.

(
    youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
    youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
) | ffmpeg -re -i - -c:v libx264 -f flv rtmp://127.0.0.1/live/H1P_x5WPF

My Python attempt is cutting off the last ~2 seconds of each video. My suspicion is that although the first pipe, yt-dlp, has an empty stdout, there is still data travelling between the second and third pipe. I haven't been able to figure out a way to properly handle the data between those two pipes at the end of the video.

from subprocess import Popen, PIPE, DEVNULL

COPY_BUFSIZE = 65424

playlist = [
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
]

if __name__ == "__main__":
    stream_cmd = [
        "ffmpeg", "-loglevel", "error",
        "-hide_banner", "-re", "-i", "-",
        "-c:v", "libx264",
        "-f", "flv",
        "-b:v", "3000k", "-minrate", "3000k",
        "-maxrate", "3000k", "-bufsize", "3000k",
        "-r", "25", "-pix_fmt", "yuv420p",
        "rtmp://127.0.0.1/live/H1P_x5WPF"
    ]
    print(f'Stream command:\n"{" ".join(stream_cmd)}"')

    encoder_cmd = [
        "ffmpeg", "-re", "-i", "-", "-f", "mpegts",
        "-c", "copy", "-"
    ]
    print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')

    stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)

    for video in playlist:
        yt_dlp_cmd = [
            "yt-dlp", "-q",
            video["url"],
            "-o", "-"
        ]

        print("Now playing: " + video["url"])

        with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
            with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
                while True:
                    yt_dlp_buf = yt_dlp_p.stdout.read(COPY_BUFSIZE)
                    print("READ: yt_dlp")
                    if not yt_dlp_buf:
                        print("yt-dlp buffer empty")
                        # Handle any data in 2nd/3rd pipes before breaking?
                        break

                    written = encoder_p.stdin.write(yt_dlp_buf)
                    print("WRITE: encoder. Bytes: " + str(written))

                    encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
                    # if not encoder_buf:
                    #     print("encoder_buf empty")
                    #     break
                    print("READ: encoder")

                    stream_bytes_written = stream_p.stdin.write(encoder_buf)
                    print("WRITE: stream, Bytes: " + str(stream_bytes_written))

Running Python 3.6.9 on MacOS.


Solution

  • Closing the stdin pipe is required for "pushing" the sub-process remaining (buffered) data to stdout pipe.

    For example, add encoder_p.stdin.close() after finish writing all data to encoder_p.stdin.


    I don't understand how your code is working.
    In my machine, it gets stack at encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE).

    I solved the problem using a "writer thread".
    The "writer thread" reads data from yt_dlp_p and write it to encoder_p.stdin.

    Note: In your specific case, it could work without a thread (because the data is just passed through FFmpeg, and not being encoded), but usually, the encoded data is not ready right after writing the input to FFmpeg.


    My code sample uses FFplay sub-process for playing the video (we need the video player because the RTMP streaming requires a "listener" in order to keep streaming).


    Here is a complete code sample:

    from subprocess import Popen, PIPE, DEVNULL
    import threading
    import time
    
    COPY_BUFSIZE = 65424
    
    playlist = [
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
    ]
    
    
    
    # Writer thread (read from yt-dlp and write to FFmpeg in chunks of COPY_BUFSIZE bytes).
    def writer(yt_dlp_proc, encoder_proc):
        while True:
            yt_dlp_buf = yt_dlp_proc.stdout.read(COPY_BUFSIZE)
            print("READ: yt_dlp")
            if not yt_dlp_buf:
                print("yt-dlp buffer empty")
                break
    
            written = encoder_proc.stdin.write(yt_dlp_buf)
            print("WRITE: encoder. Bytes: " + str(written))
        
        encoder_proc.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
        encoder_proc.wait()  # Wait for sub-process finish execution.
    
    
    if __name__ == "__main__":
        rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
    
        ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.    
        ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.
    
        stream_cmd = [
            "ffmpeg", "-loglevel", "error",
            "-hide_banner", "-re", "-i", "-",
            "-c:v", "libx264",
            "-f", "flv",
            "-b:v", "3000k", "-minrate", "3000k",
            "-maxrate", "3000k", "-bufsize", "3000k",
            "-r", "25", "-pix_fmt", "yuv420p",
            rtmp_url #"rtmp://127.0.0.1/live/H1P_x5WPF"
        ]
        print(f'Stream command:\n"{" ".join(stream_cmd)}"')
    
        encoder_cmd = [
            "ffmpeg", "-re", "-i", "-", "-f", "mpegts",
            "-c", "copy", "-"
        ]
        print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')
    
        stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
    
        for video in playlist:
            yt_dlp_cmd = [
                "yt-dlp", "-q",
                video["url"],
                "-o", "-"
            ]
    
            print("Now playing: " + video["url"])
    
            with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
                with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
    
                    thread = threading.Thread(target=writer, args=(yt_dlp_p, encoder_p))
                    thread.start()  # Start writer thread.
    
                    while True:
                        encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
    
                        if not encoder_buf:
                            print("encoder_buf empty")
                            break
    
                        print("READ: encoder")
    
                        stream_bytes_written = stream_p.stdin.write(encoder_buf)
                        print("WRITE: stream, Bytes: " + str(stream_bytes_written))
    
            thread.join()  # Wait for writer thread to end.
            yt_dlp_p.wait()
    
        stream_p.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
        stream_p.wait()  # Wait for sub-process finish execution.
    
    
        time.sleep(3)  # Wait 3 seconds before closing FFplay
        ffplay_process.kill()  # Forcefully close FFplay sub-process
    

    Update:

    I found a simpler solution using pytube and concat filter (without pipes).

    I don't know if the solution is relevant for you...

    Code sample:

    from pytube import YouTube
    from subprocess import Popen, run, PIPE, DEVNULL
    import time
    
    playlist = [
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
        {
            # 15 second video
            "url": "https://youtube.com/watch?v=QiInzFHIDp4"
        },
    ]
    
    n = len(playlist)
    
    # Build string for concat demuxer https://video.stackexchange.com/a/18256/18277
    filter_complex_str = ''
    for i in range(n):
        filter_complex_str += f'[{i}:v]setpts=PTS-STARTPTS[v{i}];[{i}:a]asetpts=PTS-STARTPTS[a{i}];'   # "[0:v]setpts=PTS-STARTPTS[v0];[0:a]asetpts=PTS-STARTPTS[a0];[1:v]setpts=PTS-STARTPTS[v1];[1:a]asetpts=PTS-STARTPTS[a1];[2:v]setpts=PTS-STARTPTS[v2];[2:a]asetpts=PTS-STARTPTS[a2]"
    for i in range(n):
        filter_complex_str += f'[v{i}][a{i}]'  # ";[v0][a0][v1][a1][v2][a2]"
    filter_complex_str += f'concat=n={n}:v=1:a=1[v][a]'
    
    # Get the video stream URL of every YouTube HTTP URL.
    # Add -i before each URL (to be used as FFmpeg input).
    playlist_url = []
    for video in playlist:
        yt = YouTube(video["url"])
        # https://github.com/pytube/pytube/issues/301
        stream_url = yt.streams[0].url  # Get the URL of the video stream
        playlist_url.append('-i')
        playlist_url.append(stream_url)
    
    
    rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
    
    ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url]  # Start the TCP server first, before the sending client.
    ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.
    
    stream_cmd = [
        "ffmpeg", "-loglevel", "error",
        "-hide_banner", "-re"] + playlist_url + ["-filter_complex",
        filter_complex_str,  # '[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1[v][a]'
        "-map", "[v]", "-map", "[a]",
        "-c:v", "libx264",
        "-f", "flv",
        "-b:v", "3000k", "-minrate", "3000k",
        "-maxrate", "3000k", "-bufsize", "3000k",
        "-r", "25", "-pix_fmt", "yuv420p",
        rtmp_url]
    
    run(stream_cmd)
    
    time.sleep(60)  # Wait 60 seconds before closing FFplay
    ffplay_process.kill()  # Forcefully close FFplay sub-process