Search code examples
condahadoop-yarndaskdask-distributedmsgpack

dask-yarn job fails with dumps_msgpack ImportError while reading parquet


I am trying to do a simple read and count of a small parquet file (10K records) using dask-yarn on an AWS EMR cluster with one master and one worker node, both are m5.xlarge instances.

I am trying to execute the following code just to test my cluster:

import os
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/lib/hadoop/lib/native/'

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd

cluster = YarnCluster(environment='conf/conda_envs/dask_yarn.tar.gz',
                      worker_vcores=1,
                      worker_memory="2GiB")

cluster.scale(2)

client = Client(cluster)

# path = 's3://bucket-name/samples/data_10K/*'
path = 'hdfs:///samples/data_10K/*'

df = dd.read_parquet(path, engine='pyarrow', columns=['YEAR', 'MONTH', 'DAY_OF_MONTH', 'FL_DATE', 'DEP_TIME', 'ARR_TIME', 'ORIGIN', 'DEST', 'ACTUAL_ELAPSED_TIME'])

print(df.count().compute())

client.shutdown()
cluster.shutdown()

However I get this exception:

Traceback (most recent call last):
  File "dask_test.py", line 30, in <module>
    print(df.count().compute())
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 284, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 566, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2646, in get
    futures = self._graph_to_futures(
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/highlevelgraph.py", line 946, in __dask_distributed_pack__
    from distributed.protocol.core import dumps_msgpack
ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/protocol/core.py)
Exception ignored in: <function YarnCluster.__del__ at 0x7f6584a2ac10>

This exception appears when reading from s3 and hdfs.

My current conda environment is the following:

# packages in environment at /home/hadoop/miniconda3/envs/dask_yarn:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       1_gnu    conda-forge
aiobotocore               1.3.0              pyhd8ed1ab_0    conda-forge
aiohttp                   3.7.4            py38h497a2fe_0    conda-forge
aioitertools              0.7.1              pyhd8ed1ab_0    conda-forge
async-timeout             3.0.1                   py_1000    conda-forge
attrs                     20.3.0             pyhd3deb0d_0    conda-forge
blas                      1.0                    openblas    anaconda
bokeh                     2.2.3                    py38_0    anaconda
boost-cpp                 1.74.0               hc6e9bd1_2    conda-forge
botocore                  1.20.49            pyhd8ed1ab_0    conda-forge
brotlipy                  0.7.0           py38h497a2fe_1001    conda-forge
bzip2                     1.0.8                h7f98852_4    conda-forge
c-ares                    1.17.1               h7f98852_1    conda-forge
ca-certificates           2020.12.5            ha878542_0    conda-forge
certifi                   2020.12.5        py38h578d9bd_1    conda-forge
cffi                      1.14.5           py38ha65f79e_0    conda-forge
chardet                   4.0.0            py38h578d9bd_1    conda-forge
click                     7.1.2              pyh9f0ad1d_0    conda-forge
cloudpickle               1.6.0                      py_0    conda-forge
conda-pack                0.6.0              pyhd3deb0d_0    conda-forge
cryptography              3.4.7            py38ha5dfef3_0    conda-forge
curl                      7.76.1               h979ede3_1    conda-forge
cytoolz                   0.11.0           py38h497a2fe_3    conda-forge
dask                      2021.4.0           pyhd3eb1b0_0  
dask-core                 2021.4.0           pyhd3eb1b0_0  
dask-yarn                 0.9              py38h578d9bd_0    conda-forge
distributed               2021.4.1         py38h578d9bd_0    conda-forge
freetype                  2.10.4               h5ab3b9f_0    anaconda
fsspec                    2021.4.0           pyhd8ed1ab_0    conda-forge
gettext                   0.19.8.1          h0b5b191_1005    conda-forge
greenlet                  1.0.0            py38h709712a_0    conda-forge
grpcio                    1.37.0           py38hdd6454d_0    conda-forge
heapdict                  1.0.1                      py_0    conda-forge
icu                       68.1                 h58526e2_0    conda-forge
idna                      3.1                pyhd3deb0d_0    conda-forge
jinja2                    2.11.2                     py_0    anaconda
jmespath                  0.10.0             pyh9f0ad1d_0    conda-forge
jpeg                      9b                   habf39ab_1    anaconda
krb5                      1.17.2               h926e7f8_0    conda-forge
lcms2                     2.11                 h396b838_0    anaconda
ld_impl_linux-64          2.35.1               hea4e1c9_2    conda-forge
libcurl                   7.76.1               hc4aaa36_1    conda-forge
libedit                   3.1.20191231         he28a2e2_2    conda-forge
libev                     4.33                 h516909a_1    conda-forge
libffi                    3.3                  h58526e2_2    conda-forge
libgcc-ng                 9.3.0               h2828fa1_19    conda-forge
libgcrypt                 1.9.3                h7f98852_0    conda-forge
libgfortran-ng            7.3.0                hdf63c60_0    anaconda
libgomp                   9.3.0               h2828fa1_19    conda-forge
libgpg-error              1.42                 h9c3ff4c_0    conda-forge
libgsasl                  1.8.0                         2    conda-forge
libhdfs3                  2.3               hb485604_1015    conda-forge
libiconv                  1.16                 h516909a_0    conda-forge
libnghttp2                1.43.0               h812cca2_0    conda-forge
libntlm                   1.4               h7f98852_1002    conda-forge
libopenblas               0.3.10               h5a2b251_0    anaconda
libpng                    1.6.37               hbc83047_0    anaconda
libprotobuf               3.15.8               h780b84a_0    conda-forge
libssh2                   1.9.0                ha56f1ee_6    conda-forge
libstdcxx-ng              9.3.0               h6de172a_19    conda-forge
libtiff                   4.1.0                h2733197_1  
libuuid                   2.32.1            h7f98852_1000    conda-forge
libxml2                   2.9.10               h72842e0_4    conda-forge
locket                    0.2.0                      py_2    conda-forge
lz4-c                     1.9.3                h9c3ff4c_0    conda-forge
markupsafe                1.1.1            py38h7b6447c_0    anaconda
msgpack-python            1.0.2            py38h1fd1430_1    conda-forge
multidict                 5.1.0            py38h497a2fe_1    conda-forge
ncurses                   6.2                  h58526e2_4    conda-forge
numpy                     1.19.1           py38h30dfecb_0    anaconda
numpy-base                1.19.1           py38h75fe3a5_0    anaconda
olefile                   0.46                       py_0    anaconda
openssl                   1.1.1k               h7f98852_0    conda-forge
packaging                 20.4                       py_0    anaconda
pandas                    1.1.3            py38he6710b0_0    anaconda
partd                     1.2.0              pyhd8ed1ab_0    conda-forge
pillow                    8.0.0            py38h9a89aac_0    anaconda
pip                       21.1               pyhd8ed1ab_0    conda-forge
protobuf                  3.15.8           py38h709712a_0    conda-forge
psutil                    5.8.0            py38h497a2fe_1    conda-forge
pyarrow                   4.0.0                    pypi_0    pypi
pycparser                 2.20               pyh9f0ad1d_2    conda-forge
pyopenssl                 20.0.1             pyhd8ed1ab_0    conda-forge
pyparsing                 2.4.7                      py_0    anaconda
pysocks                   1.7.1            py38h578d9bd_3    conda-forge
python                    3.8.8           hffdb5ce_0_cpython    conda-forge
python-dateutil           2.8.1                      py_0    anaconda
python_abi                3.8                      1_cp38    conda-forge
pytz                      2020.1                     py_0    anaconda
pyyaml                    5.4.1            py38h497a2fe_0    conda-forge
readline                  8.1                  h46c0cb4_0    conda-forge
s3fs                      2021.4.0           pyhd8ed1ab_0    conda-forge
setuptools                49.6.0           py38h578d9bd_3    conda-forge
six                       1.15.0             pyh9f0ad1d_0    conda-forge
skein                     0.8.1            py38h578d9bd_1    conda-forge
sortedcontainers          2.3.0              pyhd8ed1ab_0    conda-forge
sqlalchemy                1.4.11           py38h497a2fe_0    conda-forge
sqlite                    3.35.5               h74cdb3f_0    conda-forge
tblib                     1.7.0              pyhd8ed1ab_0    conda-forge
tk                        8.6.10               h21135ba_1    conda-forge
toolz                     0.11.1                     py_0    conda-forge
tornado                   6.1              py38h497a2fe_1    conda-forge
typing-extensions         3.7.4.3                       0    conda-forge
typing_extensions         3.7.4.3                    py_0    anaconda
urllib3                   1.26.4             pyhd8ed1ab_0    conda-forge
wheel                     0.36.2             pyhd3deb0d_0    conda-forge
wrapt                     1.12.1           py38h497a2fe_3    conda-forge
xz                        5.2.5                h516909a_1    conda-forge
yaml                      0.2.5                h516909a_0    conda-forge
yarl                      1.6.3            py38h497a2fe_1    conda-forge
zict                      2.0.0                      py_0    conda-forge
zlib                      1.2.11            h516909a_1010    conda-forge
zstd                      1.4.9                ha95c52a_0    conda-forge

I had to install pyarrow with pip3, otherwise I get another exception which does not allow me to read from hdfs or s3.

The YARN LOGS are the following:

21/04/28 23:28:31 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:28:31 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:dask.scheduler.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:787
LogContents:
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://XXXXXXXXXXXX:32843
distributed.scheduler - INFO -   dashboard at:                    :34205
distributed.scheduler - INFO - Receive client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Close client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d

End of LogType:dask.scheduler.log
***********************************************************************************


End of LogType:prelaunch.err
******************************************************************************

Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container

End of LogType:prelaunch.out
******************************************************************************

Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:application.master.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:2700
LogContents:
21/04/28 23:27:54 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/04/28 23:27:54 INFO skein.ApplicationMaster: Running as user hadoop
21/04/28 23:27:54 INFO conf.Configuration: resource-types.xml not found
21/04/28 23:27:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
21/04/28 23:27:54 INFO skein.ApplicationMaster: Application specification successfully loaded
21/04/28 23:27:55 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8030
21/04/28 23:27:55 INFO skein.ApplicationMaster: gRPC server started at ip-xxxxxxxxx.us-west-1.compute.internal:46405
21/04/28 23:27:56 INFO skein.ApplicationMaster: WebUI server started at ip-xxxxxxxxx.us-west-1.compute.internal:46231
21/04/28 23:27:56 INFO skein.ApplicationMaster: Registering application with resource manager
21/04/28 23:27:56 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:27:56 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
21/04/28 23:27:57 INFO skein.ApplicationMaster: Starting container_1619645048753_0020_01_000002...
21/04/28 23:27:57 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1619645048753_0020_01_000002
21/04/28 23:28:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
21/04/28 23:28:09 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
21/04/28 23:28:09 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Deleted application directory hdfs://ip-XXXXXXXXX.us-west-1.compute.internal:8020/user/hadoop/.skein/application_1619645048753_0020
21/04/28 23:28:09 INFO skein.ApplicationMaster: WebUI server shut down
21/04/28 23:28:09 INFO skein.ApplicationMaster: gRPC server shut down

End of LogType:application.master.log
***************************************************************************************


End of LogType:prelaunch.err
******************************************************************************

Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container

End of LogType:prelaunch.out
******************************************************************************

Does anybody know a workaround for this issue?

Thank you!


Solution

  • Your dask and distributed versions have gone out of sync, 2021.4.0 versus 2021.4.1. Updating dask should fix this. Note that you need to ensure that the exact same versions are also in the environment you are using for YARN.