I have a class that's doing some I/O bound tasks which I decided to make faster by using python concurrent. So without further talking the important code parts are (Removed the unnecessary parts):
class toolbox:
def __init__(self, username, password, fid, settings):
#...
self.access_token = ""
self.login_data = {
"username": username,
"password": password,
"sfid" : fid
}
self.request_headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36",
"Accept":"application/json, text/plain, */*",
"Authorization":""
}
self.request_data = {}
def new_session(self, n=0):
session = requests.Session()
#...
if token:
self.access_token = "Bearer "+token
self.request_headers["Authorization"] = self.access_token
dreq = session.get(self.data_url, headers=self.request_headers)
if dreq.status_code==200:
self.request_data = dict( dreq.json() )
print("[ ALL SET! ]", flush=True)
else:
print(f"\n{R}[!]{end} Something went wrong, can't grab data required to start searching!", flush=True)
if n!=3:
print("[~] Trying to relogin and grab data again!", flush=True)
self.new_session(n+1)
else:
input(f"{R}[!]{end} Maximum tries exceed! There's another thing wrong can't find it! Quiting...")
sys.exit(0)
#...
#...
def check_number(self, row):
#...
if not self.Cookies:
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
#...
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
if n==1:
n=0
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
else:
n+=1
if req.status_code!=401:
raise requests.exceptions.RequestException
#...
except requests.exceptions.RequestException as e:
#...
So the function check_number while doing its scraping, if it got 401 status code, it will login to get a new session. That's because the API session needs to be renewed each specific period!
Ok perfect, the problem comes when the threading starts and that's in the below function (in the same class):
def check_file(self, input_file):
#...
if self.settings["threaded"]:
#...
with concurrent.futures.ThreadPoolExecutor(max_workers=self.settings["threads"]) as executor:
#...
for num in executor.map(
self.check_number, manager.data_rows,
timeout=self.settings["running_timeout"],
chunksize=self.settings["save_interval"] ):
if not self.Cookies:
self.new_session()
if num:
#...
time.sleep(self.settings["throttle"])
else:
start_time = time.time()
for row in manager.data_rows:
#...
#...
The program will run smoothly without any problem till the session ends, all the threads will try to renew the session at the same time which will do a deadlock ofc. So the question is how could I refactor the code so when the session ends I can renew the session for all threads at once without causing a deadlock? Also notice that in this program the user can decide to run unthreaded using settings as you notice the "else" statement.
I searched and tried a lot of things but I'm not very good with threading, I hope anyone could help me as this is a problem in my job, not a side project.
Thanks in advance
I solved it by making a threaded function that checks every 5 secs if the session still viable and if it's not, it will renew the session while all running threads waiting and retrying every 5 secs.
The code behind this:
A decorator outside the class to run the function in thread:
def threaded(fn):
def run(*k, **kw):
threaded_function = threading.Thread(target=fn, args=k, kwargs=kw)
threaded_function.start()
return threaded_function
return run
A threaded function to keep the session viable:
@threaded
def session_keeper(self):
# This function is to make all threads use the same session to prevent deadlocks
while self.keep_threading:
time.sleep(5)
if not self.Cookies:
print(f"{G}{Bold}[SESSION]{end} Creating a new session", flush=True)
self.new_session()
continue
else:
dreq = requests.get(self.data_url, cookies = self.Cookies, headers=self.request_headers)
if dreq.status_code!=200:
print(f"{R}{Bold}[!]{end} Session ended, creating new one...", flush=True)
self.new_session()
continue
Modifying the "check_number" function to do this instead:
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
time.sleep(5)
if req.status_code!=401:
raise requests.exceptions.RequestException
else:
continue
I hope this could help someone :)