I want to create a web server, that automatically handles "orders" when receiving a POST request.
My code so far looks like this:
from json import loads
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application, url, RequestHandler
order_list = list()
class MainHandler(RequestHandler):
def get(self):
pass
def post(self):
if self.__authenticate_incoming_request():
payload = loads(self.request.body)
order_list.append(payload)
else:
pass
def __authenticate_incoming_request(self) -> bool:
# some request authentication code
return True
def start_server():
application = Application([
url(r"/", MainHandler)
])
server = HTTPServer(application)
server.listen(8080)
IOLoop.current().start()
if __name__ == '__main__':
start_server()
Here is what I want to achieve:
A
n-times based on a value defined in the request.body
(concurrently, if possible)Previously, to perform action A
n-times, I have used a threading.ThreadPoolExecutor, but I am not sure, how I should handle this correctly with a web server running in parallel.
My idea was something like this:
start_server()
tpe = ThreadPoolExecutor(max_workers=10)
while True:
if order_list:
new_order = order_list.pop(0)
tpe.submit(my_action, new_order) # my_action is my desired function
sleep(5)
Now this piece of code is of course blocking, and I was hoping that the web server would continue running in parallel, while I am running my while-loop.
Is a setup like this possible? Do I maybe need to utilize other modules? Any help greatly appreciated!
It's not working as expected because time.sleep
is a blocking function.
Instead of using a list and a while loop and sleeping to check for new items in a list, use Tornado's queues.Queue which will allow you to check for new items asynchronously.
from tornado.queues import Queue
order_queue = Queue()
tpe = ThreadPoolExecutor(max_workers=10)
async def queue_consumer():
# The old while-loop is now converted into a coroutine
# and an async for loop is used instead
async for new_order in order_queue:
# run your function in threadpool
IOLoop.current().run_in_executor(tpe, my_action, new_order)
def start_server():
# ...
# run queue_consumer function before starting the loop
IOLoop.current().spawn_callback(queue_consumer)
IOLoop.current().start()
Put items in the queue like this:
order_queue.put(payload)