Search code examples
pythonjsonaudiowebsocketautobahn

websocket relay with Autobahn python


I am trying to build a websocket server using Autobahn python which acts as a man-in-the-middle or relay for IBM Watson's text-to-speech service. I have already managed to receive and forward the streaming audio from the client to Watson by use of a queue, and I am receiving back transcription hypotheses as JSON data from Watson to my server, but I am not sure how to then forward that JSON data on to the client. It seems that the Watson transcription-side callback and the Autobahn client-side callback exist independently and I can't call a routine from one callback within the other or access data from one callback within the other.

Do I need to set up some kind of shared text message queue? I am sure it should be something simple but I think the problem may be my lack of understanding of the "self" keyword which seems to be isolating the two routines. Would also appreciate any resources on understanding "self".

# For Watson
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For Autobahn
from autobahn.twisted.websocket import WebSocketServerProtocol, \
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue, Full
except ImportError:
    from queue import Queue, Full

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretapikeycanttellyou')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self, transcript):
        print(transcript)

    def on_connected(self):
        print('Connection was successful')

    def on_error(self, error):
        print('Error received: {}'.format(error))

    def on_inactivity_timeout(self, error):
        print('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self, hypothesis):
        print(hypothesis)
        #self.sendMessage(hypothesis, isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_data(self, data):
        print(data)
        #self.sendMessage(data, isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_close(self):
        print("Connection closed")

# define callback for client-side websocket in Autobahn
class MyServerProtocol(WebSocketServerProtocol):

    def onConnect(self, request):
        print("Client connecting: {0}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")
        recognize_thread = Thread(target=recognize_using_weboscket, args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onMessage(self, payload, isBinary):
        if isBinary:
            # put audio in queue
            q.put(payload)
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

        # echo back message verbatim
        self.sendMessage(payload, isBinary)
    
    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))
        
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,
                                             content_type='audio/l16; rate=16000',
                                             recognize_callback=mycallback,
                                             interim_results=True)

if __name__ == '__main__':

    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001, factory)
    reactor.run()

It seems I need to bridge the gap between MyRecognizeCallback() and MyServerProtocol(). Please also let me know if this is a terrible implementation of what I am trying to accomplish. I know there are easier ways to relay websocket data but I wanted to familiarize myself with the websocket API/ audio streaming and text messages since eventually I want to cut Watson from the equation and use my own transcription algorithms.


Solution

  • Based on the answers here, it seems that my efforts to call MyServerProtocol().sendMessage(u"this is a message2".encode('utf8')) from main were in fact creating a new and unrelated instance of MyServerProtocol rather than piping messages into the existing connection. I was able to send new messages into the open websocket connection using the method described here.

    Here is my final code, which still needs some work, but the relevant definition is broadcast_message. It was also necessary to 'subscribe' myself to the websocket onConnect and 'unsubscribe' onClose for this method to work:

    from ibm_watson import SpeechToTextV1
    from ibm_watson.websocket import RecognizeCallback, AudioSource
    from threading import Thread
    from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
    # For autobahn
    import json
    from autobahn.twisted.websocket import WebSocketServerProtocol, \
        WebSocketServerFactory
    from twisted.internet import reactor
    
    try:
        from Queue import Queue, Full
    except ImportError:
        from queue import Queue, Full
    
    ###############################################
    #### Initalize queue to store the recordings ##
    ###############################################
    CHUNK = 1024
    # Note: It will discard if the websocket client can't consumme fast enough
    # So, increase the max size as per your choice
    BUF_MAX_SIZE = CHUNK * 10
    # Buffer to store audio
    q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
    # Create an instance of AudioSource
    audio_source = AudioSource(q, True, True)
    
    ###############################################
    #### Prepare Speech to Text Service ########
    ###############################################
    
    # initialize speech to text service
    authenticator = IAMAuthenticator('secretapikey')
    speech_to_text = SpeechToTextV1(authenticator=authenticator)
    
    # define callback for the speech to text service
    class MyRecognizeCallback(RecognizeCallback):
        def __init__(self):
            RecognizeCallback.__init__(self)
    
        def on_transcription(self, transcript):
            # Forward to client
            MyServerProtocol.broadcast_message(transcript)
    
        def on_connected(self):
            print('Connection was successful')
    
        def on_error(self, error):
            # Forward to client
            MyServerProtocol.broadcast_message('Error received: {}'.format(error))
    
        def on_inactivity_timeout(self, error):
            # Forward to client
            MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))
    
        def on_listening(self):
            print('Service is listening')
    
        def on_hypothesis(self, hypothesis):
            # Forward to client
            MyServerProtocol.broadcast_message(hypothesis)
    
        def on_data(self, data):
            # Forward to client
            MyServerProtocol.broadcast_message(data)
    
        def on_close(self):
            print("Connection closed")
            MyServerProtocol.broadcast_message("Connection closed")
    
    class MyServerProtocol(WebSocketServerProtocol):
        connections = list()
    
        def onConnect(self, request):
            print("Client connecting: {0}".format(request.peer))
            self.connections.append(self)
            # Start recognizer on connection
            recognize_thread = Thread(target=recognize_using_weboscket, args=())
            recognize_thread.daemon = True
            recognize_thread.start()
    
        def onOpen(self):
            print("WebSocket connection open.")
    
        def onMessage(self, payload, isBinary):
            if isBinary:
                # Put incoming audio into the queue
                try:
                    q.put(payload)
                except Full:
                    pass # discard
            else:
                print("Text message received: {0}".format(payload.decode('utf8')))
    
        @classmethod
        def broadcast_message(cls, data):
            payload = json.dumps(data, ensure_ascii = False).encode('utf8')
            for c in set(cls.connections):
                reactor.callFromThread(cls.sendMessage, c, payload)
          
    
        def onClose(self, wasClean, code, reason):
            print("WebSocket connection closed: {0}".format(reason))
            self.connections.remove(self)
      
    ## this function will initiate the recognize service and pass in the AudioSource
    def recognize_using_weboscket(*args):
        mycallback = MyRecognizeCallback()
        speech_to_text.recognize_using_websocket(audio=audio_source,
                                                content_type='audio/l16; rate=16000',
                                                recognize_callback=mycallback,
                                                interim_results=True)
    
    if __name__ == '__main__':
    
    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
        factory.protocol = MyServerProtocol
    
        reactor.listenTCP(9001, factory)
        reactor.run()