Search code examples
pythontwistedunix-sockettwisted.internet

Unhandled error in Deferred? Using UNIX socket with Twisted


I'm completely new to network programming and event driven events. However I was able to successfully implement a pub-sub scheme using a TCP connection between my machine (server) and client machines for testing (command line). However, I need to actually use a UNIX socket with Twisted.

I am getting the following error when running the code:

Unhandled error in Deferred

Deferred Error

Here is my code for pub_sub.py:

"""
a networking implementation of PubSub using Twisted.
=============
PubSub Server
=============
A PubSub server listens for subscription requests and publish commands, and, when
published to, sends data to subscribers. All incoming and outgoing requests are
encoded in JSON.
A Subscribe request looks like this:
    {
        "command": "subscribe",
        "topic": "hello"
    }
A Publish request looks like this:
    {
        "command": "publish",
        "topic": "hello",
        "data": {
            "world": "WORLD"
        }
    }
When the server receives a Publish request, it will send the 'data' object to all
subscribers of 'topic'.
"""

import argparse
import json
import logging

from collections import defaultdict

from twisted.internet import reactor
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.internet.endpoints import UNIXClientEndpoint, UNIXServerEndpoint, \
                                       connectProtocol
from twisted.internet.protocol import Protocol, Factory


class PubSubProtocol(Protocol):

    def __init__(self, topics):
        self.topics = topics
        self.subscribed_topic = None

    def connectionLost(self, reason):
        print("Connection lost: {}".format(reason))
        if self.subscribed_topic:
            self.topics[self.subscribed_topic].remove(self)

    def dataReceived(self, data):
        print("Data received: {}".format(data))
        try:
            request = json.loads(data)
        except ValueError:
            logging.debug("ValueError on deconding incoming data. "
                          "Data: {}".format(data), exc_info=True)
            self.transport.loseConnection()
            return

        if request['command'] == 'subscribe':
            self.handle_subscribe(request['topic'])
        elif request['command'] == 'publish':
            self.handle_publish(request['topic'], request['data'])

    def handle_subscribe(self, topic):
        print("Subscribed to topic: {}".format(topic))
        self.topics[topic].add(self)
        self.subscribed_topic = topic

    def handle_publish(self, topic, data):
        request = json.dumps(data)

        for protocol in self.topics[topic]:
            protocol.transport.write(request)
        print("Publish sent for topic: {}".format(topic))


class PubSubFactory(Factory):

    def __init__(self):
        self.topics = defaultdict(set)

    def buildProtocol(self, addr):
        return PubSubProtocol(self.topics)


class PublisherProtocol(Protocol):
    """
    Publish protocol for sending data to client, i.e. front-end web GUI.
    """
    def __init__(self, topic, **kwargs):
        self.topic = topic
        self.kwargs = kwargs

    def connectionMade(self):
        request = json.dumps({
            'command': 'publish',
            'topic': self.topic,
            'data': self.kwargs,
        })

        self.transport.write(request)
        self.transport.loseConnection()


class SubscriberProtocol(Protocol):
    """
    Subscriber protocol for client sending a request to subscribe to a specific
    topic.
    """
    def __init__(self, topic, callback):
        self.topic = topic
        self.callback = callback

    def connectionMade(self):
        request = json.dumps({
            'command': 'subscribe',
            'topic': self.topic,
        })

        self.transport.write(request)

    def dataReceived(self, data):
        kwargs = json.loads(data)

        self.callback(**kwargs)


class PubSub(object):

    def __init__(self, path='./.sock'):
        self.path = FilePath(path)
        self.reactor = reactor

    def _make_connection(self, protocol):
        endpoint = UNIXClientEndpoint(reactor, self.path)
        connection = connectProtocol(endpoint, protocol)

    def subscribe(self, topic, callback):
        """
        Subscribe 'callback' callable to 'topic'.
        """
        sub = SubscriberProtocol(topic, callback)
        self._make_connection(sub)

    def publish(self, topic, **kwargs):
        """
        Publish 'kwargs' to 'topic', calling all callables subscribed to 'topic'
        with the arguments specified in '**kwargs'.
        """
        pub = PublisherProtocol(topic, **kwargs)
        self._make_connection(pub)

    def run(self):
        """
        Convenience method to start the Twisted event loop.
        """
        self.reactor.run()


def main():
    path = FilePath("./.sock")
    endpoint = UNIXServerEndpoint(reactor, path)
    endpoint.listen(PubSubFactory())

    reactor.run()


if __name__ == '__main__':
    main()

Any help would be greatly appreciated on what I am doing wrong.

Thank you,

Brian


Solution

  • You appear to be running your software on Windows. Alas, UNIX sockets are not available on Windows. If you want to use UNIX sockets, you need to use a more POSIX-ish environment - Linux, *BSD, OS X, etc.