Search code examples
google-cloud-platformdialogflow-esgoogle-speech-apipepper

Streaming audio to DialogFlow for real-time intent recognition


I'm trying to stream audio from a (Pepper robot) microphone to DialogFlow. I have working code for sending a block of audio. When I send the request, the response contains the message None Exception iterating requests!. I've seen this error previously when I was reading from an audio file. However, I fail to see what's wrong with the data I'm passing now.

processRemote is called whenever the microphone records something. When writing the sound_data[0].tostring() to a StringIO and later retrieving it in chunks of 4096 bytes, the solution works.

self.processing_queue is supposed to hold a few chunks of audio that should be processed before working on new audio.

The error occurs in the response for self.session_client.streaming_detect_intent(requests).

I'm thankful for any idea.

    def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
        """audio stream callback method with simple silence detection"""
        sound_data_interlaced = np.fromstring(str(inputBuffer), dtype=np.int16)
        sound_data = np.reshape(sound_data_interlaced,
                                (nbOfChannels, nbOfSamplesByChannel), 'F')
        peak_value = np.max(sound_data)
        chunk = sound_data[0].tostring()
        self.processing_queue.append(chunk)
        if self.is_active:
            # detect sound
            if peak_value > 6000:
                print("Peak:", peak_value)
                if not self.recordingInProgress:
                    self.startRecording()

            # if recording is in progress we send directly to google
            try:
                if self.recordingInProgress:
                    print("preparing request proc remote")
                    requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
                    print("should send now")
                    responses = self.session_client.streaming_detect_intent(requests)
                    for response in responses:
                        print("checking response")
                        if len(response.fulfillment_text) != 0:
                            print("response not empty")
                            self.stopRecording(response)  # stop if we already know the intent
            except Exception as e:
                print(e)

    def startRecording(self):
        """init a in memory file object and save the last raw sound buffer to it."""
        # session path setup
        self.session_path = self.session_client.session_path(DIALOG_FLOW_GCP_PROJECT_ID, self.uuid)
        self.recordingInProgress = True
        requests = list()

        # set up streaming
        print("start streaming")
        q_input = dialogflow.types.QueryInput(audio_config=self.audio_config)
        req = dialogflow.types.StreamingDetectIntentRequest(
                        session=self.session_path, query_input=q_input)
        requests.append(req)

        # process pre-recorded audio
        print("work on stored audio")
        for chunk in self.processing_queue:
            print("appending chunk")
            try:
                requests.append(dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk))
            except Exception as e:
                print(e)
        print("getting response")
        responses = self.session_client.streaming_detect_intent(requests)
        print("got response")
        print(responses)

        # iterate though responses from pre-recorded audio
        try:
            for response in responses:
                print("checking response")
                if len(response.fulfillment_text) != 0:
                    print("response not empty")
                    self.stopRecording(response)  # stop if we already know the intent
        except Exception as e:
            print(e)

        # otherwise continue listening
        print("start recording (live)")

    def stopRecording(self, query_result):
        """saves the recording to memory"""
        # stop recording
        self.recordingInProgress = False
        self.disable_google_speech(force=True)
        print("stopped recording")

        # process response
        action = query_result.action
        text = query_result.fulfillment_text.encode("utf-8")
        if (action is not None) or (text is not None):
            if len(text) != 0:
                self.speech.say(text)
            if len(action) != 0:
                parameters = query_result.parameters
                self.execute_action(action, parameters)

Solution

  • As per the source code the session_client.streaming_detect_intent function expects an iterable as its argument. But you are currently giving it a list of requests.

    Won't work:

    requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
    responses = self.session_client.streaming_detect_intent(requests) 
    #None Exception iterating requests!
    

    Alternatives:

    # wrap the list in an iterator
    requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
    responses = self.session_client.streaming_detect_intent(iter(requests))
    
    # Note: The example in the source code calls the function like this
    # but this gave me the same error
    requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
    for response in self.session_client.streaming_detect_intent(requests):
        # process response
    

    Using generator structure

    While this fixed the error, the intent detection still didn't work. I believe a better program structure is to use a generator, as suggested in the docs. Something like (pseudo-code):

    def dialogflow_mic_stream_generator():
        # open stream
        audio_stream = ...
    
        # send configuration request
        query_input = dialogflow.types.QueryInput(audio_config=audio_config)
        yield dialogflow.types.StreamingDetectIntentRequest(session=session_path,
                query_input=query_input)
    
        # output audio data from stream
        while audio_stream_is_active:
            chunk = audio_stream.read(chunk_size)
            yield dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)
    
    requests = dialogflow_mic_stream_generator()
    responses = session_client.streaming_detect_intent(requests)
    
    for response in responses:
        # process response