I'm trying to read a single parquet file with snappy compression from s3 into a Dask Dataframe. There is no metadata directory, since this file was written using Spark 2.1
It does not work locally with fastparquet
import dask.dataframe as dd
dd.read_parquet('test.snappy.parquet', engine='fastparquet')
I get these exceptions:
NotADirectoryError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
95 self.fn = fn2
---> 96 with open_with(fn2, 'rb') as f:
97 self._parse_header(f, verify)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/core.py in __enter__(self)
311 mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 312 f = f2 = self.myopen(self.path, mode=mode)
313 CompressFile = merge(seekable_files, compress_files)[self.compression]
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
60 path = self._trim_filename(path)
---> 61 return open(path, mode=mode)
62
NotADirectoryError: [Errno 20] Not a directory: '/home/arinarmo/test.snappy.parquet/_metadata'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
118 try:
--> 119 fmd = read_thrift(f, parquet_thrift.FileMetaData)
120 except Exception:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/thrift_structures.py in read_thrift(file_obj, ttype)
21 obj = ttype()
---> 22 obj.read(pin)
23
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in read(self, iprot)
1864 if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
-> 1865 iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec))
1866 return
TypeError: expecting list of size 2 for struct args
During handling of the above exception, another exception occurred:
ParquetException Traceback (most recent call last)
<ipython-input-21-0dc755d9917b> in <module>()
----> 1 dd.read_parquet('test.snappy.parquet', engine='fastparquet')
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
209 sep=fs.sep)
210 except Exception:
--> 211 pf = fastparquet.ParquetFile(paths[0], open_with=myopen, sep=fs.sep)
212
213 check_column_names(pf.columns, categories)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
100 self.fn = fn
101 with open_with(fn, 'rb') as f:
--> 102 self._parse_header(f, verify)
103 self.open = open_with
104
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
120 except Exception:
121 raise ParquetException('Metadata parse failed: %s' %
--> 122 self.fn)
123 self.head_size = head_size
124 self.fmd = fmd
ParquetException: Metadata parse failed: test.snappy.parquet
It works with a local parquet file and pyarrow:
dd.read_parquet('test.snappy.parquet', engine='pyarrow')
Finally, trying with S3 and a pyarrow fails too:
dd.read_parquet('s3://redacted-location/test.snappy.parquet', engine='pyarrow')
With the following exception:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, paths, file_opener, columns, filters, categories, index)
492 columns = list(columns)
493
--> 494 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
495 schema = dataset.schema.to_arrow_schema()
496 has_pandas_metadata = schema.metadata is not None and b'pandas' in schema.metadata
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in __init__(self, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema)
703
704 if validate_schema:
--> 705 self.validate_schemas()
706
707 def validate_schemas(self):
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in validate_schemas(self)
712 self.schema = open_file(self.metadata_path).schema
713 else:
--> 714 self.schema = self.pieces[0].get_metadata(open_file).schema
715 elif self.schema is None:
716 self.schema = self.metadata.schema
IndexError: list index out of range
In this issue the use of fastparquet.writer.merge
is suggested, since it supposedly writes the metadata directory, but it fails for me with the same error as before
The error given by fastparquet is misleading: it first tries to load a directory, which fails, then it loads the path as a file directly. The real error is in the decoding of the thrift metadata. Since this commit you may find that parsing the file now does work.