Search code examples
pythonmysqlmultithreadingmultiprocessingpython-multithreading

How to use MYSQL select query with threading without duplicating the select query results?


Short context: I am using mysql table to select a value, by using an API+value i fetch a result and the result is saved into the same table.

Problem : How to process multiple rows simultaneously? whenever i use a thread to start the function , it selects the same value for each thread (i.e cursor returns same value for each thread). i need different value for each thread to process. so that i will reduce some time.

My program is

import requests
import os
import json
import pymysql
import threading

conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def main():
    cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
    idofassets = cur.fetchone()[0]
    req = requests.Session()
    resp = req.get(url+str(idofassets))
    resp_json = json.loads(resp.text)
    actual = resp_json['getResponse']
    cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))

while True:

    # For threading purpose i added

    thread1 = threading.Thread(target=main)
    thread2 = threading.Thread(target=main)
    thread3 = threading.Thread(target=main)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()


Solution

  • Your problem seems to be divided into two main different tasks:

    1 - Fetching the results from the getprocessid MySQL table

    2 - Processing the result and updating the same table (but different fields)

    So one way to optimize your code is to have a thread (it could be the main thread) do step 1 and then divide the problem in step 2 among your 3 threads:

    import requests
    import os
    import json
    import pymysql
    import threading
    #you can create these dynamically if you 
    #want more (or less) threads
    batches = [[], [], []]
    
    conn = pymysql.connect(host='localhost', user=USER, 
      passwd=PASSWORD, 
    db='sampledb',charset='utf8mb4',autocommit=True)
    
    url = "http://www.someapi.com/somelink/"
    
    cur = conn.cursor()
    
    def fetch_and_split():
        cur.execute("select asset_id from getprocessid 
          where status =%s LIMIT 1",("uploaded",))
        results = cur.fetchall()
        count = 0
        #this populates the lists to be processed with the ids
        while count < size(results):
            cur_batch = batches[size(batches) % count ]
            cur_batch.append(results[count][0])
            count++
    
    def process_and_update(batch):
        #each thread receives its own list
        for idofassets in batch:
            req = requests.Session()
            resp = req.get(url+str(idofassets))
            resp_json = json.loads(resp.text)
            actual = resp_json['getResponse']
            cur.execute("update getprocessid set class = %s 
              ,status =%s where asset_id = %s", 
              (str(actual),"completed",str(idofasset),))
    
    
    while True:
    
        # For threading purpose i added
        # The main thread splits the results
        fetch_and_split()    
        # The other threads process the 
        # results and update the values
        thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
        thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
        thread3 = threading.Thread(target=process_and_update, args=(batches[2],))
    
        thread1.start()
        thread2.start()
        thread3.start()
    
        thread1.join()
        thread2.join()
        thread3.join()