Search code examples
pythonsqlpostgresqlqueuemessage-queue

How to use sql table row items (or other alternatives) as a queue for multiple servers


I have 10 ubuntu servers running on aws as ec2 instance and a list of arguments which I need to run in each server. Outputs are saved to a PSQL database.

The arguments is a list of 1000 items which looks like this.

args 

arg1
arg2
arg3
..
..
arg1000

I split up the list into 10 parts evenly, so each server runs 100 arguments to cut down time.

So server1 opens up a file that has the following list, server2 would have args 101-200, etc.

args 

arg1
arg2
arg3
..
..
arg100

server1 opens up a function like this,

import pandas as pd
from my_functions import my_function, save_return_value_sql_db

df = pd.read_csv(arguments_file.csv)

for idx, row in df.iterrows():
    return_value = my_function(row[0])
    save_return_value_sql_db(return_value)

It saves the output value in a PSQL table called return_values.

The arguments take a different amount of time. Sometimes 3 of the servers take an hour longer than the other 7. So there is an imbalance among the arguments even though each server takes 100 arguments.

So I want to make a change to this.

I want to create a central argument_list in separate PSQL table called states and as the ubuntu servers go though the list (sharing the same list), they will change a flag in another column and move on to the next row or argument.

I am worried that multiple servers hit up the same row or argument and run the argument command at the same time on multiple servers. Is there a way around this so that no argument is run more than once by any of the 10 servers.

What I have so far is this, I created an sql table with 2 columns that looks like this,

args     state

arg1     0
arg2     0
arg3     0
..
..
arg1000  0

The servers run this python script,

import sqlalchemy
from my_engines import STATE_ENGINE
from my_functions import my_function, save_return_value_sql_db

arg_list = STATE_ENGINE.execute(f"SELECT * FROM states WHERE state = 0 order by args asc").fetchall()

current_arg = arg_list[0][0]

STATE_ENGINE.execute(f"UPDATE states SET state = 1 WHERE args = '{current_arg}'")

return_value = my_function(current_arg)
save_return_value_sql_db(return_value)

How do I modify this code so that the 10 servers don't run any of the args more than once or crash because the state has been set to 1 and is unable to set it to 1 because another server is trying to do the same thing?

Is there another data structure or library I can use to achieve what I am trying to do that would make things easier? I have no idea where to start. I googled a little and I saw something called rabbitmq and zeromq but I have no idea how they work or if they’re right for this.


Solution

  • You could start a transaction and use:

    SELECT * FROM states WHERE state = 0
    order by args asc
    LIMIT 100 -- otherwise the first worker would consume the whole list
    FOR NO KEY UPDATE SKIP LOCKED -- if other workers already holds 100 rows, take next 100
    

    for a cheap concurrency mechanism.