how can i start process consumers from code for cpu bound tasks ?
And how can i get signal callbacks without immediate ? If i run MemoryHuey with immediate=True all works fine but if i set it to False i get only empty lists.
Problem:
I have several endpoints which have to be processed with different priorities. The processes are all CPU intensive and have to be moved to the background and processed using multiprocessing. A Redis cluster is later used as job storage.
Thanks :) PS: ignore async
Edit: Or is there an easy solution to run with supervisor in AWS EB ?
huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"
app = FastAPI()
router = APIRouter()
app.include_router(router)
# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)
# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)
try:
huey = PriorityRedisHuey(
'worker',
results=True, # set to True if testing
store_none=False,
# host=os.getenv('REDIS_HOST'),
# port=6379,
# only for docker-compose testing
host='redis',
port=6379
)
huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
print("exception during connection catched", e.status_code, e.detail)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Entrypoint
"""
return {"message": "OCR Engine and NER service"}
# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup
jobs = list()
@app.get("/job_monitoring")
async def monitor_jobs():
if len(jobs) > 200:
del jobs[:]
return jobs
@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
now = datetime.now()
dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))
@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
"""Named Entity Recognition endpoint
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
"""ner_complete endpoint except txt files
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""txt endpoint
"""
task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""pdfa endpoint
"""
task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
mode : str
return as txt or pdfa
token : str
validation token
callback_url : str
url for result callback
res_width : int
width for image resizing
check_before_ocr : bool
checks if image is usable for OCR
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = handle_ocr(invoice, mode, res_width, check_before_ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
token : str
validation token
callback_url : str
url for result callback
max_sentence_number : int
max number of sentences to process
return_merged : bool
if true merge result tokens
crop_image : int
percentage to crop from image
ocr : bool
if true do ocr
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
Edit: i use ElasticBeanstalk for Deployment / runs with supervisor ?
You are trying to share memory across processes which isn't going to work. Each process has its own copy of the memory - this is why you need to use a broker like Redis, Sqlite or the file-system when you are using multiprocess huey.