I'm developing an application similar to Omegle, where users are matched with strangers based on their common interests. To achieve this, I'm combining the LSH (Locality Sensitive Hashing) algorithm with the Minhash technique. However, I'm facing difficulties in implementing a waiting mechanism for users who don't immediately find a matching pair when they call the API.
Currently, I'm using the sleep function to introduce a waiting period before returning the status "Failed". However, it seems that the sleep function is blocking other API calls and causing delays for other users. I'm curious to know how websites like Omegle handle this scenario and what would be the correct procedure to implement an efficient waiting mechanism.
Here's code snippet:
from fastapi import FastAPI, Body
from typing import Annotated
from pydantic import BaseModel
from sonyflake import SonyFlake
import redis
import time
from datasketch import MinHash, MinHashLSH
app = FastAPI()
sf = SonyFlake()
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
'type': 'redis',
'redis': {'host': '127.0.0.1', 'port': 6379}
}, prepickle=True)
class Partner(BaseModel):
client_id: int
partner_id: str
status: str = 'Failed'
@app.post("/start", response_model=Partner)
async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
client_id=sf.next_id()
partner_id = ''
minhash = MinHash()
if not interests:
return Partner(client_id = client_id, partner_id = partner_id)
client_hash = f"user:{client_id}:interests:hash"
minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
lsh.insert(client_hash, minhash)
matches = lsh.query(minhash)
matches.remove(client_hash)
if not matches:
time.sleep(5)
matches = lsh.query(minhash)
matches.remove(client_hash)
if not matches:
lsh.remove(client_hash)
return Partner(client_id = client_id, partner_id = partner_id)
lsh.remove(client_hash)
lsh.remove(matches[0])
return Partner(client_id = client_id, partner_id = matches[0], status="Success")
I would appreciate any insights or suggestions on how to properly implement the waiting mechanism, ensuring that it doesn't negatively impact the performance and responsiveness of the application. Is there a recommended approach or best practice to achieve this functionality while maintaining the responsiveness of the API for other users?
Thank you.
From what I can tell, it seems like you're implementing a synchronous waiting mechanism, which is blocking the entire process. You should look into using a form of polling or asynch. This can be done through a WebSocket or through HTTP long-polling. I think WebSockets are better for a few reasons, but mostly bi-diretional comms and keep-alive conns. I have tried to implement for that for you below:
from fastapi import FastAPI, WebSocket, Body, Depends
from typing import Annotated
from pydantic import BaseModel
from sonyflake import SonyFlake
import redis
from datasketch import MinHash, MinHashLSH
app = FastAPI()
sf = SonyFlake()
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
'type': 'redis',
'redis': {'host': '127.0.0.1', 'port': 6379}
}, prepickle=True)
class Partner(BaseModel):
client_id: int
partner_id: str
status: str = 'Failed'
@app.post("/start", response_model=Partner)
async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
client_id=sf.next_id()
partner_id = ''
minhash = MinHash()
if not interests:
return Partner(client_id = client_id, partner_id = partner_id)
client_hash = f"user:{client_id}:interests:hash"
minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
lsh.insert(client_hash, minhash)
return Partner(client_id = client_id, partner_id = partner_id)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await websocket.accept()
while True:
client_hash = f"user:{client_id}:interests:hash"
minhash = lsh.get(client_hash)
if minhash is None:
await websocket.send_json({"status": "Error", "message": "Client ID not found"})
return
matches = lsh.query(minhash)
matches.remove(client_hash)
if not matches:
await websocket.send_json({"status": "Waiting"})
else:
lsh.remove(client_hash)
lsh.remove(matches[0])
await websocket.send_json({"status": "Success", "client_id": client_id, "partner_id": matches[0]})
return
await asyncio.sleep(5)