This question is the follow-up of this question
In my application I want to modify various mp3 and then mix them together. I know I could do it with a single command line in FFmpeg but it can end up very messy since I need to use various filter on each sample and I have five of them. My idea is to edit each sample individually, save them into a pipe and finally mix them.
import subprocess
import os
def create_pipes():
os.mkfifo("pipe1")
os.mkfifo("pipe2")
def create_samp():
sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3", \
"-af", "adelay=15000|15000", "-f", "mp3", "pipe:pipe1"], stdout=subprocess.PIPE).stdout
return(sample)
def create_samp_2():
sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/370/370934_6399962-lq.ogg", \
"-af", "adelay=1000|1000", "-f", "mp3", "pipe:pipe2"], stdout=subprocess.PIPE).stdout
return(sample)
def record(samp, samp_2):
process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3', \
"-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3", \
"-i", "pipe1", \
"-i", "pipe2", \
"-filter_complex", "amix=inputs=3:duration=longest", "output.mp3"], stdin=subprocess.PIPE)
process.stdin.write(samp)
process.stdin.write(samp_2)
process.stdin.close()
process.wait()
create_pipes()
samp = create_samp()
samp_2 = create_samp_2()
record(samp, samp_2)
When I run the script, create_samp()
and create_samp2()
are running fine. But I run record()
, the program get stuck with no error message so I can't figure out what the issue is.
Using named pipes (Linux only):
Named pipes are required when there are two ore more input streams (that need to be piped from memory buffers).
Using named pipes is not trivial at all...
From FFmpeg point of view named pipes are like (non-seekable) input files.
Using named pipes in Python (in Linux):
Assume pipe1
is the name of the "named pipe" (e.g. pipe1 = "audio_pipe1"
).
Create a "named pipe":
os.mkfifo(pipe1)
Open the pipe as "write only" file:
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer).
Write the data to the pipe in small chunks.
According to this post, the default buffer size of the pipe in most Linux systems is 64KBytes.
Because the data is larger than 65536 bytes, we need to write the data to the pipe in small chunks.
I decided to use an arbitrary chunk size of 1024 bytes.
Pipe writing operation is a "blocking" operation.
I solved it by using a "writer" thread:
def writer(data, pipe_name, chunk_size):
# Open the pipes as opening "low level IO" files (open for "open for writing only").
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a "low level IO" file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
Close the pipe:
os.close(fd_pipe)
Remove (unlink) the named pipe:
os.unlink(pipe1)
Here is the sample from the previous post, using two named pipes:
import subprocess
import os
from threading import Thread
def create_samp():
# Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
# Apply adelay audio filter.
# Encode the audio in mp3 format.
# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
"-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
# Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
"-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
return sample1, sample2
def writer(data, pipe_name, chunk_size):
# Open the pipes as opening files (open for "open for writing only").
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
# Closing the pipes as closing files.
os.close(fd_pipe)
def record(samp1, samp2):
# Names of the "Named pipes"
pipe1 = "audio_pipe1"
pipe2 = "audio_pipe2"
# Create "named pipes".
os.mkfifo(pipe1)
os.mkfifo(pipe2)
# Open FFmpeg as sub-process
# Use two audio input streams:
# 1. Named pipe: "audio_pipe1"
# 2. Named pipe: "audio_pipe2"
# Merge the two audio streams using amix audio filter.
# Store the result to output file: output.mp3
process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
"-i", pipe1,
"-i", pipe2,
"-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
stdin=subprocess.PIPE)
# Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
thread1 = Thread(target=writer, args=(samp1, pipe1, 1024)) # thread1 writes samp1 to pipe1
thread2 = Thread(target=writer, args=(samp2, pipe2, 1024)) # thread2 writes samp2 to pipe2
# Start the two threads
thread1.start()
thread2.start()
# Wait for the two writer threads to finish
thread1.join()
thread2.join()
process.wait() # Wait for FFmpeg sub-process to finish
# Remove the "named pipes".
os.unlink(pipe1)
os.unlink(pipe2)
sampl1, sampl2 = create_samp()
record(sampl1, sampl2)
Same solution using a class:
Implementing the solution using a class ("NamedPipeWriter
" class) is a bit more elegant.
The class inherits Thread class, and overrides run
method.
You may create a list of multiple objects, and iterate them in a loop, (instead of duplicating the code for each new input stream).
Here is the same solution using a class:
import subprocess
import os
import stat
from threading import Thread
def create_samp():
# Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
# Apply adelay audio filter.
# Encode the audio in mp3 format.
# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
"-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
# Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
"-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
return sample1, sample2
class NamedPipeWriter(Thread):
""" Write data (in small chunks) to a named pipe using a thread """
def __init__(self, pipe_name, data):
""" Initialization - get pipe name and data to be written """
super().__init__()
self._pipe_name = pipe_name
self._chunk_size = 1024
self._data = data
def run(self):
""" Open the pipe, write data in small chunks and close the pipe """
chunk_size = self._chunk_size
data = self._data
# Open the pipes as opening files (open for "open for writing only").
fd_pipe = os.open(self._pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
# Closing the pipes as closing files.
os.close(fd_pipe)
def record(samp1, samp2):
# Names of the "Named pipes"
pipe1 = "audio_pipe1"
pipe2 = "audio_pipe2"
# Create "named pipes".
if not stat.S_ISFIFO(os.stat(pipe1).st_mode):
os.mkfifo(pipe1) # Create the pipe only if not exist.
if not stat.S_ISFIFO(os.stat(pipe2).st_mode):
os.mkfifo(pipe2)
# Open FFmpeg as sub-process
# Use two audio input streams:
# 1. Named pipe: "audio_pipe1"
# 2. Named pipe: "audio_pipe2"
# Merge the two audio streams using amix audio filter.
# Store the result to output file: output.mp3
process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
"-i", pipe1,
"-i", pipe2,
"-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
stdin=subprocess.PIPE)
# Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
named_pipe_writer1 = NamedPipeWriter(pipe1, samp1)
named_pipe_writer2 = NamedPipeWriter(pipe2, samp2)
# Start the two threads
named_pipe_writer1.start()
named_pipe_writer2.start()
# Wait for the two writer threads to finish
named_pipe_writer1.join()
named_pipe_writer1.join()
process.wait() # Wait for FFmpeg sub-process to finish
# Remove the "named pipes".
os.unlink(pipe1)
os.unlink(pipe2)
sampl1, sampl2 = create_samp()
record(sampl1, sampl2)
Notes: