Search code examples
pythonpandasdaskamazon-emr

Operations on a Dask DataFrame fail when using snappy compression


I partitioned a large dataset into a sequence of parquet files using pandas.DataFrame.to_parquet and saved them to S3. I then read these into Dask on a cluster using dask.read_parquet:

import dask.dataframe as dd
df = dd.read_parquet(
    's3://aleksey-emr-dask/data/2019-taxi-dataset/',
    storage_options={'key': 'secret', 'secret': 'secret'},
    engine='fastparquet'
)

pandas uses snappy compression by default. fastparquet is able to work with this compression so long as you install the python-snappy and snappy packages. Since I am running on AWS EMR, and using Dask's EMR example bootstrap script, I have installed these packages from conda-forge using the --botstrap-actions flag and the --conda-packages optional argument:

python3 -m pip list | grep snappy
python-snappy          0.5.4

This is enough to make dd.read_parquet succeed. However, certain operations fail with KeyError: snappy. For example, this fails:

passenger_counts = df.trip_distance.value_counts().compute()

I know this is not an error with the cluster configuration because other operations, like this one, succeed:

vendors = df.VendorID.value_counts().compute()
> 2.0    53516733
> 1.0    30368157
> 4.0      267080
> Name: VendorID, dtype: int64

Which leads to my question. Does Dask not support snappy compression, even if its IO engine (fastparquet in this case) does?

Here is the full body of the error message:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<timed exec> in <module>

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    445         postcomputes.append(x.__dask_postcompute__())
    446 
--> 447     results = schedule(dsk, keys, **kwargs)
    448     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    449 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2686                     should_rejoin = False
   2687             try:
-> 2688                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2689             finally:
   2690                 for f in futures.values():

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1986                 direct=direct,
   1987                 local_worker=local_worker,
-> 1988                 asynchronous=asynchronous,
   1989             )
   1990 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1874                 else:
   1875                     self._gather_future = future
-> 1876                 response = await future
   1877 
   1878             if response["status"] == "error":

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1925 
   1926             else:  # ask scheduler to gather data for us
-> 1927                 response = await retry_operation(self.scheduler.gather, keys=keys)
   1928 
   1929         return response

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    388         delay_min=retry_delay_min,
    389         delay_max=retry_delay_max,
--> 390         operation=operation,
    391     )

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    859             name, comm.name = comm.name, "ConnectionPool." + key
    860             try:
--> 861                 result = await send_recv(comm=comm, op=key, **kwargs)
    862             finally:
    863                 self.pool.reuse(self.addr, comm)

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    642         await comm.write(msg, serializers=serializers, on_error="raise")
    643         if reply:
--> 644             response = await comm.read(deserializers=deserializers)
    645         else:
    646             response = None

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    204                     deserialize=self.deserialize,
    205                     deserializers=deserializers,
--> 206                     allow_offload=self.allow_offload,
    207                 )
    208             except EOFError:

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers, allow_offload)
     85         res = await offload(_from_frames)
     86     else:
---> 87         res = _from_frames()
     88 
     89     return res

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in _from_frames()
     64         try:
     65             return protocol.loads(
---> 66                 frames, deserialize=deserialize, deserializers=deserializers
     67             )
     68         except EOFError:

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
    126             if deserialize or key in bytestrings:
    127                 if "compression" in head:
--> 128                     fs = decompress(head, fs)
    129                 fs = merge_frames(head, fs)
    130                 value = _deserialize(head, fs, deserializers=deserializers)

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in decompress(header, frames)
    214     return [
    215         compressions[c]["decompress"](frame)
--> 216         for c, frame in zip(header["compression"], frames)
    217     ]

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in <listcomp>(.0)
    214     return [
    215         compressions[c]["decompress"](frame)
--> 216         for c, frame in zip(header["compression"], frames)
    217     ]

KeyError: 'snappy'

Solution

  • You need to have snappy and python-snappy installed in the client environment as well, so that the worker can use the codec to turn source bytes into data.

    I'm accessing the cluster from a local Jupyter notebook on my machine via SSH port forwarding, and did not have these packages installed locally. Installing them in my local env:

    $ conda install -c conda-forge snappy python-snappy
    

    Resolved the issue.