Search code examples
pythonflaskdeploymentmultiprocessingwsgi

How to implement WSGI while using multiprocessing in flask?


Suppose I have video process handler in a function, I want implement true parallel processing using multiprocessing module instead of threading.

So my code looks like this in general:

def start_subprocess(pid, progress_dict):
  '''
  Suppose this is video processing starter...
  '''
  from time import sleep
  # Simulating a subprocess with variable progress
  progress = 0
  while progress < 100:
    sleep(1)
    progress += 10
    progress_dict[pid] = progress


def get_current_progress_of_subprocess(pid, progress_dict):
  '''
  Suppose this is video current progress by pid, in this context current progress are all current frames has been processed... 
  '''
  # Retrieve current progress of a subprocess
  if pid in progress_dict:
    return progress_dict[pid]
  else:
    return None


def flask_service(progress_dict):
  from flask import Flask, request, jsonify
  from multiprocessing import Process
  app = Flask(__name__)

  @app.route('/start_process')
  def start_process():
    pid = request.args.get('pid')
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({'message': f'Invalid pid.'}), 400

      # Start a new subprocess
      if pid not in progress_dict:
        process = Process(target=start_subprocess, args=(pid, progress_dict))
        process.start()
        progress_dict[pid] = 0
      else:
        return jsonify({'message': f'Process with pid {pid} already started.'}), 400

      return jsonify({'message': f'Process started with pid: {pid}'}), 200
    else:
      return jsonify({'message': 'No pid provided.'}), 400

  @app.route('/get_progress')
  def get_progress():
    pid = request.args.get('pid')
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({'message': f'Invalid pid.'}), 400

      # Retrieve current progress of the subprocess
      current_progress = get_current_progress_of_subprocess(pid, progress_dict)

      if current_progress is not None:
        return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
      else:
        return jsonify({'message': f'Process with pid {pid} not found.'}), 404
    else:
      return jsonify({'message': 'No pid provided.'}), 400

  app.run(debug=False, threaded=True)


if __name__ == '__main__':
  from multiprocessing import Process, Manager
  with Manager() as manager:
    progress_dict = manager.dict()
    p1 = Process(target=flask_service, args=(progress_dict,))
    p1.start()
    try:
      p1.join()
    except KeyboardInterrupt:
      p1.terminate()
      p1.join()
    finally:
      print('Ending up!')

I have achieved what I want, but the problem is how do I deploy this with WSGI? As far As I know from flask import Flask class is creating instance of compatible WSGI. So what it looks like in deployment?

Also, am I actually implement true parallel processing? I just want make sure if I really do it. true parallel I mean is using hardware capabilities to solve parallel issue such as video processing.


Solution

  • I think your code is more complicated than it needs to be. You should create a function, create_app, which creates the Flask application and the process_dict managed dictionary and then returns the Flask application. Functionget_current_progress_of_subprocess remains unchanged and I added handling of a KeyboardInterrupt in start_subprocess.

    def start_subprocess(pid, progress_dict):
      '''
      Suppose this is video processing starter...
      '''
      from time import sleep
      # Simulating a subprocess with variable progress
      try:
        progress = 0
        while progress < 100:
          sleep(1)
          progress += 10
          progress_dict[pid] = progress
      except KeyboardInterrupt:
        pass
    
    
    def get_current_progress_of_subprocess(pid, progress_dict):
      '''
      Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
      '''
      # Retrieve current progress of a subprocess
      if pid in progress_dict:
        return progress_dict[pid]
      else:
        return None
    
    
    def create_app():
      from flask import Flask, request, jsonify
      from multiprocessing import Process, Manager
    
      app = Flask(__name__)
    
      # Create the managed dictionary:
      progress_dict = Manager().dict()
    
      @app.route('/start_process')
      def start_process():
        pid = request.args.get('pid')
        if pid is not None:
          try:
            pid = int(pid)
          except ValueError:
            return jsonify({'message': f'Invalid pid.'}), 400
    
          # Start a new subprocess
          if pid not in progress_dict:
            process = Process(target=start_subprocess, args=(pid, progress_dict))
            process.start()
            progress_dict[pid] = 0
          else:
            return jsonify({'message': f'Process with pid {pid} already started.'}), 400
    
          return jsonify({'message': f'Process started with pid: {pid}'}), 200
        else:
          return jsonify({'message': 'No pid provided.'}), 400
    
      @app.route('/get_progress')
      def get_progress():
        pid = request.args.get('pid')
        if pid is not None:
          try:
            pid = int(pid)
          except ValueError:
            return jsonify({'message': f'Invalid pid.'}), 400
    
          # Retrieve current progress of the subprocess
          current_progress = get_current_progress_of_subprocess(pid, progress_dict)
    
          if current_progress is not None:
            return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
          else:
            return jsonify({'message': f'Process with pid {pid} not found.'}), 404
        else:
          return jsonify({'message': 'No pid provided.'}), 400
    
      return app
    
    
    if __name__ == '__main__':
      create_app().run(debug=False, threaded=True)