Search code examples
distributed-computingdask

Read blocks of files in parallel from filesystem/S3 with Dask?


I'm putting together a proof of concept in which I want to use PyCuda to process large-ish files of character data (~8GB in one file per task) in a distributed environment - AWS to be specific. I'm aware that HDFS will segment the data file and distribute it to the workers but I'm trying to keep my environment as simple as possible and would rather not have to install Hadoop if I don't have to.

I recently watched a couple of webinars from Continuum Analytics about their Dask framework and it looks like it will do exactly what I need. Given the above paragraph and the Dask framework, what is the current recommendation for filesystem? Am I stuck with HDFS or is there a better/simpler solution?


Solution

  • Most file systems provide the ability to read just part of a file, including HDFS, your local file system, and S3, the standard bulk data store for AWS instances. This allows parallel computing frameworks (like Dask) to partition up large files into many smaller bits that workers can crunch on in parallel.

    dask.bytes.read_bytes

    For most use cases this happens behind the scenes automatically (users of read_text and read_csv needn't bother worrying about this.) It sounds like you have a custom file format, so I'll direct you to the read_bytes functions. For S3, this works as follows:

    from dask.bytes import read_bytes
    sample, partitions = read_bytes('s3://bucket/keys.*.foo', 
                                    blocksize=100000000)
    

    Sample will be a short 10kB sample of your data and partitions will be a list of dask.delayed objects that you can use with general for loops to construct your computation.

    If your data has a delimiter of some sort that you want dask to respect, you can provide that with the delimiter= keyword argument.

    This same function works with other systems, like your local file system or HDFS (if you have installed and imported hdfs3 and distributed).

    sample, partitions = read_bytes('local://bucket/keys.*.foo', blocksize=100000000)
    sample, partitions = read_bytes('hdfs://bucket/keys.*.foo')
    

    Example

    As an example, here is an incorrect but illustrative version of how we implement dask.dataframe.read_csv

    from dask import delayed
    import pandas as pd
    import dask.dataframe as dd
    
    def read_csv(path, **kwargs):
        sample, partitions = read_bytes(path, blocksize=100000000, delimiter=b'\n')
        dataframes = [delayed(pd.read_csv)(part, **kwargs) for part in partitions]
        return dd.from_delayed(dataframes)
    

    This is incorrect because pd.read_csv actually wants a BytesIO object, we haven't handled keyword arguments robustly, and we aren't managing dataframe metadata well from the sample (columns, dtypes, etc..) These details get in the way of the general point though and are probably beyond the interest of this question.

    Edit: Use other functions in more common cases

    People have been referring to this question as an answer to the more general question of "How do I read data from S3?" Most people don't use the read_bytes interface, which is somewhat low level. Instead, most users will probably want to use one of the higher-level functions like the following:

    import dask.bag as db
    records = db.read_text('s3://bucket/keys.*.json').map(json.loads)
    
    import dask.dataframe as dd
    df = dd.read_csv('s3://bucket/keys.*.csv')