Search code examples
pythonamazon-elastic-beanstalkfastapiconsumerpython-huey

start process consumer from code and get signal callbacks


how can i start process consumers from code for cpu bound tasks ?

And how can i get signal callbacks without immediate ? If i run MemoryHuey with immediate=True all works fine but if i set it to False i get only empty lists.

Problem:

I have several endpoints which have to be processed with different priorities. The processes are all CPU intensive and have to be moved to the background and processed using multiprocessing. A Redis cluster is later used as job storage.

Thanks :) PS: ignore async

Edit: Or is there an easy solution to run with supervisor in AWS EB ?

huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"

app = FastAPI()

router = APIRouter()
app.include_router(router)

# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)

# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)

try:
    huey = PriorityRedisHuey(
           'worker',
           results=True, # set to True if testing
           store_none=False,
        #   host=os.getenv('REDIS_HOST'),
        #   port=6379,
        # only for docker-compose testing
           host='redis',
           port=6379
       )
    huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
    print("exception during connection catched", e.status_code, e.detail)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/")
async def root():
    """Entrypoint
    """
    return {"message": "OCR Engine and NER service"}


# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup

jobs = list()

@app.get("/job_monitoring")
async def monitor_jobs():
    if len(jobs) > 200:
        del jobs[:]
    return jobs

@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
    now = datetime.now()
    dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
    jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))


@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
              max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
    """Named Entity Recognition endpoint
    """
    task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
    return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
                       callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
                       return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
    """ner_complete endpoint except txt files
    """
    task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
    return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
                  res_width: int = 3500, check_before_ocr: bool = True):
    """txt endpoint
    """
    task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
                   res_width: int = 3500, check_before_ocr: bool = True):
    """pdfa endpoint
    """
    task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}


@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
    """one job execution and returns result to URL

    Parameters
    ----------
    invoice : class
        API input
    mode : str
        return as txt or pdfa
    token : str
        validation token
    callback_url : str
        url for result callback
    res_width : int
        width for image resizing
    check_before_ocr : bool
        checks if image is usable for OCR
    """
    headers = {'Authorization': 'Bearer ' + token}
    try:
        start = timer()
        result = handle_ocr(invoice, mode, res_width, check_before_ocr)
        time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
        requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
    except HTTPException as e:
        print("exception catched", e.status_code, e.detail)
        requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})


@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
    """one job execution and returns result to URL

    Parameters
    ----------
    invoice : class
        API input
    token : str
        validation token
    callback_url : str
        url for result callback
    max_sentence_number : int
        max number of sentences to process
    return_merged : bool
        if true merge result tokens
    crop_image : int
        percentage to crop from image
    ocr : bool
        if true do ocr
    """
    headers = {'Authorization': 'Bearer ' + token}
    try:
        start = timer()
        result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
        time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
        requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
    except HTTPException as e:
        print("exception catched", e.status_code, e.detail)
        requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})

Edit: i use ElasticBeanstalk for Deployment / runs with supervisor ?


Solution

  • You are trying to share memory across processes which isn't going to work. Each process has its own copy of the memory - this is why you need to use a broker like Redis, Sqlite or the file-system when you are using multiprocess huey.