I'm putting together a proof of concept in which I want to use PyCuda to process large-ish files of character data (~8GB in one file per task) in a distributed environment - AWS to be specific. I'm aware that HDFS will segment the data file and distribute it to the workers but I'm trying to keep my environment as simple as possible and would rather not have to install Hadoop if I don't have to.
I recently watched a couple of webinars from Continuum Analytics about their Dask framework and it looks like it will do exactly what I need. Given the above paragraph and the Dask framework, what is the current recommendation for filesystem? Am I stuck with HDFS or is there a better/simpler solution?
Most file systems provide the ability to read just part of a file, including HDFS, your local file system, and S3, the standard bulk data store for AWS instances. This allows parallel computing frameworks (like Dask) to partition up large files into many smaller bits that workers can crunch on in parallel.
For most use cases this happens behind the scenes automatically (users of read_text
and read_csv
needn't bother worrying about this.) It sounds like you have a custom file format, so I'll direct you to the read_bytes
functions. For S3, this works as follows:
from dask.bytes import read_bytes
sample, partitions = read_bytes('s3://bucket/keys.*.foo',
blocksize=100000000)
Sample will be a short 10kB sample of your data and partitions
will be a list of dask.delayed objects that you can use with general for loops to construct your computation.
If your data has a delimiter of some sort that you want dask to respect, you can provide that with the delimiter=
keyword argument.
This same function works with other systems, like your local file system or HDFS (if you have installed and imported hdfs3
and distributed
).
sample, partitions = read_bytes('local://bucket/keys.*.foo', blocksize=100000000)
sample, partitions = read_bytes('hdfs://bucket/keys.*.foo')
As an example, here is an incorrect but illustrative version of how we implement dask.dataframe.read_csv
from dask import delayed
import pandas as pd
import dask.dataframe as dd
def read_csv(path, **kwargs):
sample, partitions = read_bytes(path, blocksize=100000000, delimiter=b'\n')
dataframes = [delayed(pd.read_csv)(part, **kwargs) for part in partitions]
return dd.from_delayed(dataframes)
This is incorrect because pd.read_csv
actually wants a BytesIO object, we haven't handled keyword arguments robustly, and we aren't managing dataframe metadata well from the sample (columns, dtypes, etc..) These details get in the way of the general point though and are probably beyond the interest of this question.
People have been referring to this question as an answer to the more general question of "How do I read data from S3?" Most people don't use the read_bytes
interface, which is somewhat low level. Instead, most users will probably want to use one of the higher-level functions like the following:
import dask.bag as db
records = db.read_text('s3://bucket/keys.*.json').map(json.loads)
import dask.dataframe as dd
df = dd.read_csv('s3://bucket/keys.*.csv')