Search code examples
pythonasynchronousasyncore

python; asyncore handle_read; do I need a seperate thread?


From asyncore's documentation: https://docs.python.org/2/library/asyncore.html

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

  def __init__(self, host, path):
      asyncore.dispatcher.__init__(self)
      self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
      self.connect( (host, 80) )
      self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

  def handle_connect(self):
      pass

  def handle_close(self):
      self.close()

  def handle_read(self):
      print self.recv(8192)

  def writable(self):
      return (len(self.buffer) > 0)

  def handle_write(self):
      sent = self.send(self.buffer)
      self.buffer = self.buffer[sent:]

  client = HTTPClient('www.python.org', '/')
  asyncore.loop()

Now suppose instead we have:

def handle_read(self):
    data = self.recv(8192)
    //SOME REALLY LONG AND COMPLICATED THING

Is this handled in Asyncore itself due to asyncore's polling/select methodlogy, or do I need to do:

def handle_read(self):
    data = self.recv(8192)
    h = Handler(data)
    h.start()

class Handler(threading.Thread):
    def __init__(self, data):
        threading.Thread.__init__(self)
        self.data = data
    def run():
        //LONG AND COMPLICATED THING WITH DATA

If I do need a thread, do I want h.join() after start? It seems to work, but since join blocks, I'm not exactly sure why.


Solution

  • I am posting my own answer because it was inspired by Orest Hera's answer, but because I have knowledge of my workload, it is a slight variant.

    My workload is such that requests can arrive in bursts, but these burts are sporadic (non-stationary). Moreover, they need to be processed in order they are received. So, here is what I did:

    #! /usr/bin/env python3
    
    import asyncore #https://docs.python.org/2/library/asyncore.html
    import socket
    import threading    
    import queue
    import time
    
    fqueue = queue.Queue()
    
    class Handler(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.keep_reading = True
    
        def run(self):
            while self.keep_reading:
                if fqueue.empty():
                    time.sleep(1)
                else: 
                    #PROCESS
        def stop(self):
            self.keep_reading = False
    
    
    class Listener(asyncore.dispatcher): #http://effbot.org/librarybook/asyncore.htm
        def __init__(self, host, port):
            asyncore.dispatcher.__init__(self)
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.connect((host, port))
    
    
        def handle_read(self):
            data = self.recv(40) #pretend it always waits for 40 bytes
            fqueue.put(data)
    
        def start(self):
            try:
                h = Handler()
                h.start()
                asyncore.loop()
            except KeyboardInterrupt:
                pass
            finally:
                h.stop()