Search code examples
daskdask-distributeddask-delayeddask-dataframe

Dask - load dataframe from SQL without specifying index_col


I'm trying to load a Dask dataframe from a SQL connection. Per the read_sql_table documentation, it is necessary to pass in an index_col. What should I do if there's a possibility that there are no good columns to act as index?

Could this be a suitable replacement?

# Break SQL Query into chunks
chunks = []
num_chunks = math.ceil(num_records / chunk_size)

# Run query for each chunk on Dask workers
for i in range(num_chunks):
    query = 'SELECT * FROM ' + table + ' LIMIT ' + str(i * chunk_size) + ',' + str(chunk_size)
    chunk = dask.delayed(pd.read_sql)(query, sql_uri)
    chunks.append(chunk)

# Aggregate chunks
df = dd.from_delayed(chunks)
dfs[table] = df

Solution

  • Unfortunately, LIMIT/OFFSET is not in general a reliable way to partition a query in most SQL implementations. In particular, it is often the case that, to get to an offset and fetch later rows from a query, the engine must first parse through earlier rows, and thus the work to generate a number of partitions is much magnified. In some cases, you might even end up with missed or duplicated rows. This was the reasoning behind requiring boundary values in the dask sql implementation.

    However, there is nothing in principle wrong with the way you are setting up your dask dataframe. If you can show that your server does not suffer from the problems we were anticipating, then you are welcome to take that approach.