Search code examples
pythonasync-awaitfastapinonblocking

Implementing a waiting mechanism in an application similar to Omegle using LSH algorithm and Minhash


I'm developing an application similar to Omegle, where users are matched with strangers based on their common interests. To achieve this, I'm combining the LSH (Locality Sensitive Hashing) algorithm with the Minhash technique. However, I'm facing difficulties in implementing a waiting mechanism for users who don't immediately find a matching pair when they call the API.

Currently, I'm using the sleep function to introduce a waiting period before returning the status "Failed". However, it seems that the sleep function is blocking other API calls and causing delays for other users. I'm curious to know how websites like Omegle handle this scenario and what would be the correct procedure to implement an efficient waiting mechanism.

Here's code snippet:

from fastapi import FastAPI, Body
from typing import Annotated
from pydantic import BaseModel
from sonyflake import SonyFlake
import redis
import time
from datasketch import MinHash, MinHashLSH

app = FastAPI()
sf = SonyFlake()
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
    'type': 'redis',
    'redis': {'host': '127.0.0.1', 'port': 6379}
}, prepickle=True)


class Partner(BaseModel):
    client_id: int
    partner_id: str
    status: str = 'Failed'


@app.post("/start", response_model=Partner)
async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
    client_id=sf.next_id()
    partner_id = ''

    minhash = MinHash()
    if not interests:
        return Partner(client_id = client_id, partner_id = partner_id)

    client_hash = f"user:{client_id}:interests:hash"
    minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
    lsh.insert(client_hash, minhash)

    matches = lsh.query(minhash)
    matches.remove(client_hash)

    if not matches:
        time.sleep(5)

    matches = lsh.query(minhash)
    matches.remove(client_hash)

    if not matches:
        lsh.remove(client_hash)
        return Partner(client_id = client_id, partner_id = partner_id)

    lsh.remove(client_hash)
    lsh.remove(matches[0])
    return Partner(client_id = client_id, partner_id = matches[0], status="Success")

I would appreciate any insights or suggestions on how to properly implement the waiting mechanism, ensuring that it doesn't negatively impact the performance and responsiveness of the application. Is there a recommended approach or best practice to achieve this functionality while maintaining the responsiveness of the API for other users?

  • Please share any insights or best practices on implementing an efficient waiting mechanism in this scenario.
  • Any suggestions on optimizing the code or improving its responsiveness would be greatly appreciated.
  • Please provide resources or links to read more about it.

Thank you.


Solution

  • From what I can tell, it seems like you're implementing a synchronous waiting mechanism, which is blocking the entire process. You should look into using a form of polling or asynch. This can be done through a WebSocket or through HTTP long-polling. I think WebSockets are better for a few reasons, but mostly bi-diretional comms and keep-alive conns. I have tried to implement for that for you below:

    from fastapi import FastAPI, WebSocket, Body, Depends
    from typing import Annotated
    from pydantic import BaseModel
    from sonyflake import SonyFlake
    import redis
    from datasketch import MinHash, MinHashLSH
    
    app = FastAPI()
    sf = SonyFlake()
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
        'type': 'redis',
        'redis': {'host': '127.0.0.1', 'port': 6379}
    }, prepickle=True)
    
    class Partner(BaseModel):
        client_id: int
        partner_id: str
        status: str = 'Failed'
    
    @app.post("/start", response_model=Partner)
    async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
        client_id=sf.next_id()
        partner_id = ''
    
        minhash = MinHash()
        if not interests:
            return Partner(client_id = client_id, partner_id = partner_id)
    
        client_hash = f"user:{client_id}:interests:hash"
        minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
        lsh.insert(client_hash, minhash)
    
        return Partner(client_id = client_id, partner_id = partner_id)
    
    @app.websocket("/ws/{client_id}")
    async def websocket_endpoint(websocket: WebSocket, client_id: int):
        await websocket.accept()
        while True:
            client_hash = f"user:{client_id}:interests:hash"
            minhash = lsh.get(client_hash)
            
            if minhash is None:
                await websocket.send_json({"status": "Error", "message": "Client ID not found"})
                return
    
            matches = lsh.query(minhash)
            matches.remove(client_hash)
    
            if not matches:
                await websocket.send_json({"status": "Waiting"})
            else:
                lsh.remove(client_hash)
                lsh.remove(matches[0])
                await websocket.send_json({"status": "Success", "client_id": client_id, "partner_id": matches[0]})
                return
            await asyncio.sleep(5)