Search code examples
pythonffmpegpipe

Multiple named pipes in ffmpeg


This question is the follow-up of this question

In my application I want to modify various mp3 and then mix them together. I know I could do it with a single command line in FFmpeg but it can end up very messy since I need to use various filter on each sample and I have five of them. My idea is to edit each sample individually, save them into a pipe and finally mix them.

import subprocess
import os

def create_pipes():
    os.mkfifo("pipe1")
    os.mkfifo("pipe2")

    
def create_samp():   
    sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3", \
                            "-af", "adelay=15000|15000", "-f", "mp3", "pipe:pipe1"], stdout=subprocess.PIPE).stdout
    return(sample)

def create_samp_2():   
    sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/370/370934_6399962-lq.ogg", \
                            "-af", "adelay=1000|1000", "-f", "mp3", "pipe:pipe2"], stdout=subprocess.PIPE).stdout
    return(sample)


def record(samp, samp_2):  
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3', \
                                "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3", \
                                "-i", "pipe1", \
                                "-i", "pipe2", \
                                "-filter_complex", "amix=inputs=3:duration=longest", "output.mp3"], stdin=subprocess.PIPE)

    process.stdin.write(samp)  
    process.stdin.write(samp_2)
    process.stdin.close()  
    process.wait()

create_pipes()
samp = create_samp()
samp_2 = create_samp_2()
record(samp, samp_2)

When I run the script, create_samp() and create_samp2() are running fine. But I run record(), the program get stuck with no error message so I can't figure out what the issue is.


Solution

  • Using named pipes (Linux only):

    Named pipes are required when there are two ore more input streams (that need to be piped from memory buffers).
    Using named pipes is not trivial at all...

    From FFmpeg point of view named pipes are like (non-seekable) input files.

    Using named pipes in Python (in Linux):
    Assume pipe1 is the name of the "named pipe" (e.g. pipe1 = "audio_pipe1").

    1. Create a "named pipe":

      os.mkfifo(pipe1)
      
    2. Open the pipe as "write only" file:

      fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer).
      
    3. Write the data to the pipe in small chunks.
      According to this post, the default buffer size of the pipe in most Linux systems is 64KBytes.
      Because the data is larger than 65536 bytes, we need to write the data to the pipe in small chunks.
      I decided to use an arbitrary chunk size of 1024 bytes.
      Pipe writing operation is a "blocking" operation.
      I solved it by using a "writer" thread:

      def writer(data, pipe_name, chunk_size):
          # Open the pipes as opening "low level IO" files (open for "open for writing only").
          fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)
      
          for i in range(0, len(data), chunk_size):
              # Write to named pipe as writing to a "low level IO" file (but write the data in small chunks).
              os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe
      
    4. Close the pipe:

      os.close(fd_pipe)
      
    5. Remove (unlink) the named pipe:

      os.unlink(pipe1)
      

    Here is the sample from the previous post, using two named pipes:

    import subprocess
    import os
    from threading import Thread
    
    
    def create_samp():
        # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
        # Apply adelay audio filter.
        # Encode the audio in mp3 format.
        # FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
        sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                                  "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
    
        # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
        sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                                  "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
    
        return sample1, sample2
    
    
    def writer(data, pipe_name, chunk_size):
        # Open the pipes as opening files (open for "open for writing only").
        fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)
    
        for i in range(0, len(data), chunk_size):
            # Write to named pipe as writing to a file (but write the data in small chunks).
            os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe
    
        # Closing the pipes as closing files.
        os.close(fd_pipe)
    
    
    def record(samp1, samp2):
        # Names of the "Named pipes"
        pipe1 = "audio_pipe1"
        pipe2 = "audio_pipe2"
    
        # Create "named pipes".
        os.mkfifo(pipe1)
        os.mkfifo(pipe2)
    
        # Open FFmpeg as sub-process
        # Use two audio input streams:
        # 1. Named pipe: "audio_pipe1"
        # 2. Named pipe: "audio_pipe2"
        # Merge the two audio streams using amix audio filter.
        # Store the result to output file: output.mp3
        process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                    "-i", pipe1,
                                    "-i", pipe2,
                                    "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                    stdin=subprocess.PIPE)
    
        # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
        thread1 = Thread(target=writer, args=(samp1, pipe1, 1024))  # thread1 writes samp1 to pipe1
        thread2 = Thread(target=writer, args=(samp2, pipe2, 1024))  # thread2 writes samp2 to pipe2
    
        # Start the two threads
        thread1.start()
        thread2.start()
    
        # Wait for the two writer threads to finish
        thread1.join()
        thread2.join()
    
        process.wait()  # Wait for FFmpeg sub-process to finish
    
        # Remove the "named pipes".
        os.unlink(pipe1)
        os.unlink(pipe2)
    
    
    sampl1, sampl2 = create_samp()
    record(sampl1, sampl2)
    

    Update:

    Same solution using a class:
    Implementing the solution using a class ("NamedPipeWriter" class) is a bit more elegant.
    The class inherits Thread class, and overrides run method.

    You may create a list of multiple objects, and iterate them in a loop, (instead of duplicating the code for each new input stream).

    Here is the same solution using a class:

    import subprocess
    import os
    import stat
    from threading import Thread
    
    
    def create_samp():
        # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
        # Apply adelay audio filter.
        # Encode the audio in mp3 format.
        # FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
        sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                                  "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
    
        # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
        sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                                  "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
    
        return sample1, sample2
    
    
    class NamedPipeWriter(Thread):
        """ Write data (in small chunks) to a named pipe using a thread """
    
        def __init__(self, pipe_name, data):
            """ Initialization - get pipe name and data to be written """
            super().__init__()
            self._pipe_name = pipe_name
            self._chunk_size = 1024
            self._data = data
            
    
        def run(self):
            """ Open the pipe, write data in small chunks and close the pipe """
            chunk_size = self._chunk_size
            data = self._data
    
            # Open the pipes as opening files (open for "open for writing only").
            fd_pipe = os.open(self._pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)
    
            for i in range(0, len(data), chunk_size):
                # Write to named pipe as writing to a file (but write the data in small chunks).
                os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe
    
            # Closing the pipes as closing files.
            os.close(fd_pipe)
    
    
        
    
    def record(samp1, samp2):
        # Names of the "Named pipes"
        pipe1 = "audio_pipe1"
        pipe2 = "audio_pipe2"
    
        # Create "named pipes".
        if not stat.S_ISFIFO(os.stat(pipe1).st_mode):
            os.mkfifo(pipe1)  # Create the pipe only if not exist.
    
        if not stat.S_ISFIFO(os.stat(pipe2).st_mode):
            os.mkfifo(pipe2)
    
        # Open FFmpeg as sub-process
        # Use two audio input streams:
        # 1. Named pipe: "audio_pipe1"
        # 2. Named pipe: "audio_pipe2"
        # Merge the two audio streams using amix audio filter.
        # Store the result to output file: output.mp3
        process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                    "-i", pipe1,
                                    "-i", pipe2,
                                    "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                    stdin=subprocess.PIPE)
    
        # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
        named_pipe_writer1 = NamedPipeWriter(pipe1, samp1)
        named_pipe_writer2 = NamedPipeWriter(pipe2, samp2)
    
        # Start the two threads
        named_pipe_writer1.start()
        named_pipe_writer2.start()
    
        # Wait for the two writer threads to finish
        named_pipe_writer1.join()
        named_pipe_writer1.join()
    
        process.wait()  # Wait for FFmpeg sub-process to finish
    
        # Remove the "named pipes".
        os.unlink(pipe1)
        os.unlink(pipe2)
    
    
    sampl1, sampl2 = create_samp()
    record(sampl1, sampl2)
    

    Notes:

    • The code was tested in Ubuntu 18.04 (in a virtual machine).