Suppose I have video process handler in a function, I want implement true parallel processing using multiprocessing
module instead of threading
.
So my code looks like this in general:
def start_subprocess(pid, progress_dict):
'''
Suppose this is video processing starter...
'''
from time import sleep
# Simulating a subprocess with variable progress
progress = 0
while progress < 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
def get_current_progress_of_subprocess(pid, progress_dict):
'''
Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
'''
# Retrieve current progress of a subprocess
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def flask_service(progress_dict):
from flask import Flask, request, jsonify
from multiprocessing import Process
app = Flask(__name__)
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Start a new subprocess
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'Process with pid {pid} already started.'}), 400
return jsonify({'message': f'Process started with pid: {pid}'}), 200
else:
return jsonify({'message': 'No pid provided.'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Retrieve current progress of the subprocess
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
else:
return jsonify({'message': f'Process with pid {pid} not found.'}), 404
else:
return jsonify({'message': 'No pid provided.'}), 400
app.run(debug=False, threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Manager
with Manager() as manager:
progress_dict = manager.dict()
p1 = Process(target=flask_service, args=(progress_dict,))
p1.start()
try:
p1.join()
except KeyboardInterrupt:
p1.terminate()
p1.join()
finally:
print('Ending up!')
I have achieved what I want, but the problem is how do I deploy this with WSGI? As far As I know from flask import Flask
class is creating instance of compatible WSGI. So what it looks like in deployment?
Also, am I actually implement true parallel processing? I just want make sure if I really do it. true parallel I mean is using hardware capabilities to solve parallel issue such as video processing.
I think your code is more complicated than it needs to be. You should create a function, create_app
, which creates the Flask application and the process_dict
managed dictionary and then returns the Flask application. Functionget_current_progress_of_subprocess
remains unchanged and I added handling of a KeyboardInterrupt in start_subprocess
.
def start_subprocess(pid, progress_dict):
'''
Suppose this is video processing starter...
'''
from time import sleep
# Simulating a subprocess with variable progress
try:
progress = 0
while progress < 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
except KeyboardInterrupt:
pass
def get_current_progress_of_subprocess(pid, progress_dict):
'''
Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
'''
# Retrieve current progress of a subprocess
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def create_app():
from flask import Flask, request, jsonify
from multiprocessing import Process, Manager
app = Flask(__name__)
# Create the managed dictionary:
progress_dict = Manager().dict()
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Start a new subprocess
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'Process with pid {pid} already started.'}), 400
return jsonify({'message': f'Process started with pid: {pid}'}), 200
else:
return jsonify({'message': 'No pid provided.'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Retrieve current progress of the subprocess
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
else:
return jsonify({'message': f'Process with pid {pid} not found.'}), 404
else:
return jsonify({'message': 'No pid provided.'}), 400
return app
if __name__ == '__main__':
create_app().run(debug=False, threaded=True)