Search code examples
mysqlparallel-processingamazon-redshiftboto3batch-processing

Boto 3 Redshift client execute_statement lagging


I'm trying to copy some data over from MySQL to RS. The table is 650,000-ish rows, so i'm doing it in batches of 10,000 or so and also using python's multiprocessing module. The first few iterations come out ok. The script pulls 10,000 rows and creates a csv file in S3 (Edited to mention that each such file is around 300Mb, so it's not really feasible to upload the whole data not in chunks, unfortunately), then copies the data from that file to the redshift table. But soon the script starts skipping chunks. It still uploads the batch file to S3, but the copying to the main table isn't happening. This last run took 34 minutes and here's a graph of the record ids (which should be from 1 to 650,000) that actually got copied to the table. You can see the jumps where a whole bunch of records didn't make it to the main table.

enter image description here

Does anyone have any solutions for this? I'm currently running it locally, but the same thing happens when i upload the docker image to AWS and run it from there.

from psycopg2.extensions import register_adapter
from psycopg2.extras import Json
import MySQLdb
import json
import boto3
from io import StringIO
import botocore.session as bc
from botocore.client import Config
import multiprocessing as mp

db = 'db'
cluster_id = 'cluster'
bucket = 'bucket'
rs_secret = 'rs_secret'

session = boto3.Session(
    botocore_session=bc.get_session(),
    profile_name='profile',
    region_name='us-west-2'
)

secret_arn = session.client('secretsmanager').get_secret_value(SecretId=rs_secret)['ARN']
config = Config(connect_timeout=180, read_timeout=180)
client_redshift = session.client("redshift-data", config=config)

# ------------------------------------------------------------------------------------

def get_secret(secret_id, session):
    region_name = "us-west-2"
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    get_secret_value_response = client.get_secret_value(SecretId=secret_id)
    secret = get_secret_value_response['SecretString']
    return json.loads(secret)


def upload_to_S3(data, key, session):
    s3 = session.resource('s3')
    csv_buffer = StringIO()
    data.to_csv(csv_buffer, index=False)
    s3.Object(bucket, key).put(Body=csv_buffer.getvalue())


def write_to_RS(query):
    try:
        result = client_redshift.execute_statement(Database=db,
                                                   SecretArn=secret_arn,
                                                   Sql=query,
                                                   ClusterIdentifier=cluster_id)
    except Exception as e:
        raise Exception(e)

# ------------------------------------------------------------------------------------

table_name = 'form'
col = ['settings', 'fields', 'design', 'confirmation']
types = ['super', 'super', 'super', 'super']

table_name_full = f'{db}.{table_name}'

# ------------------------------------------------------------------------------------

mysql_secret = get_secret('secret', session)

register_adapter(list, Json)

mydb = MySQLdb.connect(
        host = mysql_secret['host'],
        user = mysql_secret['username'],
        password = mysql_secret['password'],
        database = 'db'
)

mycursor = mydb.cursor()

# ------------------------------------------------------------------------------------

iam_role = 'iam_role'

chunk = 10000
chunks = range(65)

def task(c):
    q = f"""
    select
        id, {', '.join(col)}
    from {table_name}
    where not ({' is null and '.join(col)} is null)
        and not ({' = "[]" and '.join(col)} = "[]")
    limit {chunk}
    offset {chunk * (c)}
    INTO OUTFILE S3 's3://{bucket}/raw/{table_name}_{c + 1}.csv'
    FORMAT CSV HEADER
    OVERWRITE ON;
    """

    mycursor.execute(q)

    key = f'raw/{table_name}_{c + 1}.csv.part_00000'

    query = f"""
        COPY {table_name_full}
        FROM 's3://{bucket}/{key}'
        iam_role '{iam_role}'
        ignoreheader 1
        CSV;
    """

    write_to_RS(query)

if __name__ == "__main__":  
    pool = mp.Pool(mp.cpu_count())
    pool.map(task, chunks)
    pool.close()

Solution

  • There are lots of places where this can go awry. S3 file name collisions, S3 operations out of order, S3 object opened before data is added ...

    The first thing to do is understand if all the data eventually ended up in S3. The process you have should leave the files in S3 after it runs. If you run a single copy of all the files from S3 into a new table are all the records present?

    If not you are debugging your code to understand why.

    If all records are there then you likely have an issue with issuing the COPY before the data write is complete or an issue with RS COPY commands failing. Either way you should be able to check the messages that the COPY commands respond with and see what errors or row count loaded you get.

    I suspect you may be overrunning the max connection (session) count for Redshift which is a hard limit of 500 for the entire cluster.

    You may also be seeing issues with the WLM being overrun but this issue should clear with time. However, there could be a timeout that is aborting some COPYs.

    BTW you are using RS in a very inefficient manner. 10,000 rows in a file is quite small for RS and issuing single file COPY commands is also not ideal. You would likely be much better off building the files in S3 first then issuing a single COPY command. This would also have a good chance of fixing whatever the underlying problem is. If you can increase the S3 file size to at least 100MB (1GB would be ideal) that would so have a positive impact on your run time.