Search code examples
pythonsocketswebsocketpublish-subscribechannel

How to create channel with socket in Python


I've started Python a few times ago and now, I'm currently creating a socket server. I already have the server functioning with multiple threads with multiple clients (Hurray !) But I'm looking for functionality I can't call (i don't even know if it exists) I would like to create a kind of channel where client can send different type of message.

An example I create a channel INFO and if the server received this type of socket it just does a print

I create another channel DEBUG where I can send custom command which the server will execute

etc

In a non-programming language it will do this:

def socketDebug(command):
     run command

def socketInfo(input):
     print input

if socket == socketDebug:
     socketDebug(socket.rcv)
else:
   if socket == socketInfo:
     socketInfo(socket.rcv)

I hope I'm clear.


Solution

  • Here is a quite simple implementation of a Channel class. It creates a socket, to accept connections from clients and to send messages. It is also a client itself, receiving messages from other Channel instances (in separate processes for example).

    The communication is done in two threads, which is pretty bad (I would use async io). when a message is received, it calls the registered function in the receiving thread which can cause some threading issues.

    Each Channel instance creates its own sockets, but it would be much more scalable to have channel "topics" multiplexed by a single instance.

    Some existing libraries provide a "channel" functionality, like nanomsg.

    The code here is for educational purposes, if it can help...

    import socket
    import threading
    
    class ChannelThread(threading.Thread):
      def __init__(self):
        threading.Thread.__init__(self)
    
        self.clients = []
        self.chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.chan_sock.bind(('',0))  
        _, self.port = self.chan_sock.getsockname()
        self.chan_sock.listen(5)
        self.daemon=True
        self.start()
    
      def run(self):
        while True:
          new_client = self.chan_sock.accept()
          if not new_client:
            break
          self.clients.append(new_client)
    
      def sendall(self, msg):
        for client in self.clients:
          client[0].sendall(msg)
    
    class Channel(threading.Thread):
      def __init__(self):
        threading.Thread.__init__(self)
    
        self.daemon = True
        self.channel_thread = ChannelThread()
    
      def public_address(self):
        return "tcp://%s:%d" % (socket.gethostname(), self.channel_thread.port)
    
      def register(self, channel_address, update_callback):
        host, s_port = channel_address.split("//")[-1].split(":")
        port = int(s_port)
        self.peer_chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)   
        self.peer_chan_sock.connect((host, port))
        self._callback = update_callback
        self.start()
    
      def deal_with_message(self, msg):
        self._callback(msg)
    
      def run(self):
        data = ""
        while True:
          new_data = self.peer_chan_sock.recv(1024)
          if not new_data:
            # connection reset by peer
            break
          data += new_data
          msgs = data.split("\n\n")
          if msgs[-1]:
            data = msgs.pop()
          for msg in msgs:
            self.deal_with_message(msg)
    
      def send_value(self, channel_value):
        self.channel_thread.sendall("%s\n\n" % channel_value)
    

    Usage:

    In process A:

    c = Channel()
    c.public_address()
    

    In process B:

    def msg_received(msg):
      print "received:", msg
    
    c = Channel()
    c.register("public_address_string_returned_in_process_A", msg_received)
    

    In process A:

    c.send_value("HELLO")
    

    In process B:

    received: HELLO