I am developing an algorithm for real-time speaker identification. My idea was to run three tasks, namely writeAudio()
, detectionBlock()
, and identificationBlock()
, in parallel using the multiprocessing
module.
In effect, the writeAudio()
function uses PyAudio
to capture a continuous recording and save the 0.5-second audio files to a local directory, the detectionBlock()
function processes the two oldest 0.5-second files from the directory and uses a Voice Activity Detection (VAD) model to determine if the audio is speech or noise, and the identificationBlock()
function processes a separate 3-second audio file (saved to a different directory from a chunk of 0.5-second audio files) and then uses a Voice Recognition (VR) model to determine the speaker's identity.
I was hoping that I could apply multiprocessing
here to sidestep the Global Interpreter Lock (GIL) and run the three functions simultaneously as Process
objects. Currently, the program won't start running the detectionBlock()
or identificationBlock()
functions until after writeAudio()
has finished recording.
Here's the code for the current implementation with multiprocessing
:
from multiprocessing import Process
# Perform Parallel Processing with the Multiprocessing Module
def parallelProcessing(self):
# Define Individual Functions as Process() Objects
rec = Process(target=self.writeAudio()) # Cog 1
vad = Process(target=self.detectionBlock()) # Cog 2
si = Process(target=self.identificationBlock()) # Cog 3
cogs = [rec, vad, si] # List of functions
# Run All Three Cogs in Parallel
rec.start() # Start Cog 1
time.sleep(3) # Wait 3 sec to start speech detection & identification
vad.start() # Start Cog 2
si.start() # Start Cog 3
for cog in cogs:
cog.join() # Wait for processes to complete before continuing
I've never applied multiprocessing
before, so I was wondering if this would be feasible with a different implementation approach. Thanks for your help.
EDIT:
I've added simplified versions of the functions below for enhanced clarity.
# Speech Detection Sequence
def detectionBlock(self):
# Create VoiceActivityDetectionModel() Class Object
vad = VoiceActivityDetectionModel()
# Run Speech Detection on Oldest Audio Segments in Directory
files = self.getListDir() # List of audiofiles
index = 0 # First file in list
path_1 = os.path.join(self.VAD_audio_path, files[index])
path_2 = os.path.join(self.VAD_audio_path, files[index+1])
label_1, _, _ = vad.detectFromAudiofile(path_1) # VAD classifier for first segment
label_2, _, _ = vad.detectFromAudiofile(path_2) # VAD classifier for second segment
if (label_1 == 'speech') and (label_2 == 'speech'):
self.combineAudio(index) # Generate 3-sec recording for SI if
# speech is detected in both audiofiles
else:
self.deleteAudio() # Remove oldest audio segment
# Speaker Identification Sequence
def identificationBlock(self):
# Create EnsemblePredictions() Class Object
ep = EnsemblePredictions()
# Run Speaker Identification on Oldest Audio Segment in Directory
files = self.getListDir(audio_type='SI')
index = 0 # First file in list
if files:
filepath = os.path.join(self.SI_audio_path, files[index])
speaker, prob_list = ep.predict(filepath, first_session=False) # SI classifier
time_stamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) # Time of identification
self.speakerDiarization(speaker=speaker, prob_list=prob_list, time_stamp=time_stamp) # Save results
# Remove 3-Second Audio Segment from Directory
self.deleteAudio(audio_type='SI')
# Audio Recording Sequence
def writeAudio(self):
# Instantiate Recording System Variables
FORMAT = pyaudio.paFloat32 # 32 bits per sample
CHANNELS = 1 # Mono
RATE = 16000 # Sampling Rate
CHUNK = int(self.VAD_audio_length*RATE) # Chunks of bytes to record from microphone
# Initialize Recording
p = pyaudio.PyAudio() # Create interface to PortAudio
input('Press ENTER to Begin Recording') # Wait for keypress to record
if keyboard.is_pressed('Enter'):
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
frames_per_buffer=CHUNK,
input=True)
print()
print('Hold SPACE to Finish Recording')
while(True):
# End Process with Manual User Interrupt
if keyboard.is_pressed('Space'):
break
# Generate Audio Recording
data = stream.read(CHUNK) # Read 0.5-second segment from audio stream
data = np.frombuffer(data, dtype=np.float32) # Convert to NumPy array
filename = 'VAD_segment_' + str(self.VAD_audio_count) + '.wav'
sf.write(os.path.join(self.VAD_audio_path, filename), data, RATE)
# Adjust Segment Count
self.VAD_audio_count = self.VAD_audio_count + 1 # Increment
# Stop & Close Stream
stream.stop_stream()
stream.close()
# Terminate PortAudio Interface
p.terminate()
Here's an example of what I mentioned in the comments. I don't have all the components to actually run it, so treat it a bit like pseudocode, but I believe it should be a good jumping off point. The main improvement is a bit of simplification courtesy of pastream
which claims basically GIL-less portaudio
iteration. The benefit here is less overhead and easier transfer of data to at least the first stage in the pipeline which is detecting audio. You may need some additional complexity to drop frames in the event of a slowdown, but this structure should generally work provided I understood the pastream
docs correctly.
import pastream
import multiprocessing as mp
from Queue import Empty
class ExitFlag: pass
def voice_identification(rx_q: mp.Queue):
while True:
try:
received = rx_q.get(1)
#if voice_identification is too slow you may want to `get` until
# the queue is empty to drop all but most recent frame. This way
# you won't have an infinitely growing queue.
except Empty:
pass
if isinstance(received, ExitFlag):
break
else:
print(identify(received)) #identify audio
print("identifier process exiting")
if __name__ == "__main__":
tx_q = mp.Queue()
identifier_p = mp.Process(target=voice_identification, args=(tx_q,))
identifier_p.start()
samplerate=44100
stream = pastream.InputStream()
#3 second chunks every half second
for chunk in stream.chunks(chunksize=samplerate/2, overlap=(samplerate/2)*5):
if detect_audio(chunk): #detect audio
tx_q.put(chunk)
if exit_key_down(): #however you want to detect this, it's good to ensure smooth shutdown of child
tx_q.put(ExitFlag())
identifier_p.join()
break