I am trying to do a simple read and count of a small parquet file (10K records) using dask-yarn
on an AWS EMR cluster with one master and one worker node, both are m5.xlarge
instances.
I am trying to execute the following code just to test my cluster:
import os
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/lib/hadoop/lib/native/'
from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
cluster = YarnCluster(environment='conf/conda_envs/dask_yarn.tar.gz',
worker_vcores=1,
worker_memory="2GiB")
cluster.scale(2)
client = Client(cluster)
# path = 's3://bucket-name/samples/data_10K/*'
path = 'hdfs:///samples/data_10K/*'
df = dd.read_parquet(path, engine='pyarrow', columns=['YEAR', 'MONTH', 'DAY_OF_MONTH', 'FL_DATE', 'DEP_TIME', 'ARR_TIME', 'ORIGIN', 'DEST', 'ACTUAL_ELAPSED_TIME'])
print(df.count().compute())
client.shutdown()
cluster.shutdown()
However I get this exception:
Traceback (most recent call last):
File "dask_test.py", line 30, in <module>
print(df.count().compute())
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 284, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 566, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2646, in get
futures = self._graph_to_futures(
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/highlevelgraph.py", line 946, in __dask_distributed_pack__
from distributed.protocol.core import dumps_msgpack
ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/protocol/core.py)
Exception ignored in: <function YarnCluster.__del__ at 0x7f6584a2ac10>
This exception appears when reading from s3
and hdfs
.
My current conda environment is the following:
# packages in environment at /home/hadoop/miniconda3/envs/dask_yarn:
#
# Name Version Build Channel
_libgcc_mutex 0.1 conda_forge conda-forge
_openmp_mutex 4.5 1_gnu conda-forge
aiobotocore 1.3.0 pyhd8ed1ab_0 conda-forge
aiohttp 3.7.4 py38h497a2fe_0 conda-forge
aioitertools 0.7.1 pyhd8ed1ab_0 conda-forge
async-timeout 3.0.1 py_1000 conda-forge
attrs 20.3.0 pyhd3deb0d_0 conda-forge
blas 1.0 openblas anaconda
bokeh 2.2.3 py38_0 anaconda
boost-cpp 1.74.0 hc6e9bd1_2 conda-forge
botocore 1.20.49 pyhd8ed1ab_0 conda-forge
brotlipy 0.7.0 py38h497a2fe_1001 conda-forge
bzip2 1.0.8 h7f98852_4 conda-forge
c-ares 1.17.1 h7f98852_1 conda-forge
ca-certificates 2020.12.5 ha878542_0 conda-forge
certifi 2020.12.5 py38h578d9bd_1 conda-forge
cffi 1.14.5 py38ha65f79e_0 conda-forge
chardet 4.0.0 py38h578d9bd_1 conda-forge
click 7.1.2 pyh9f0ad1d_0 conda-forge
cloudpickle 1.6.0 py_0 conda-forge
conda-pack 0.6.0 pyhd3deb0d_0 conda-forge
cryptography 3.4.7 py38ha5dfef3_0 conda-forge
curl 7.76.1 h979ede3_1 conda-forge
cytoolz 0.11.0 py38h497a2fe_3 conda-forge
dask 2021.4.0 pyhd3eb1b0_0
dask-core 2021.4.0 pyhd3eb1b0_0
dask-yarn 0.9 py38h578d9bd_0 conda-forge
distributed 2021.4.1 py38h578d9bd_0 conda-forge
freetype 2.10.4 h5ab3b9f_0 anaconda
fsspec 2021.4.0 pyhd8ed1ab_0 conda-forge
gettext 0.19.8.1 h0b5b191_1005 conda-forge
greenlet 1.0.0 py38h709712a_0 conda-forge
grpcio 1.37.0 py38hdd6454d_0 conda-forge
heapdict 1.0.1 py_0 conda-forge
icu 68.1 h58526e2_0 conda-forge
idna 3.1 pyhd3deb0d_0 conda-forge
jinja2 2.11.2 py_0 anaconda
jmespath 0.10.0 pyh9f0ad1d_0 conda-forge
jpeg 9b habf39ab_1 anaconda
krb5 1.17.2 h926e7f8_0 conda-forge
lcms2 2.11 h396b838_0 anaconda
ld_impl_linux-64 2.35.1 hea4e1c9_2 conda-forge
libcurl 7.76.1 hc4aaa36_1 conda-forge
libedit 3.1.20191231 he28a2e2_2 conda-forge
libev 4.33 h516909a_1 conda-forge
libffi 3.3 h58526e2_2 conda-forge
libgcc-ng 9.3.0 h2828fa1_19 conda-forge
libgcrypt 1.9.3 h7f98852_0 conda-forge
libgfortran-ng 7.3.0 hdf63c60_0 anaconda
libgomp 9.3.0 h2828fa1_19 conda-forge
libgpg-error 1.42 h9c3ff4c_0 conda-forge
libgsasl 1.8.0 2 conda-forge
libhdfs3 2.3 hb485604_1015 conda-forge
libiconv 1.16 h516909a_0 conda-forge
libnghttp2 1.43.0 h812cca2_0 conda-forge
libntlm 1.4 h7f98852_1002 conda-forge
libopenblas 0.3.10 h5a2b251_0 anaconda
libpng 1.6.37 hbc83047_0 anaconda
libprotobuf 3.15.8 h780b84a_0 conda-forge
libssh2 1.9.0 ha56f1ee_6 conda-forge
libstdcxx-ng 9.3.0 h6de172a_19 conda-forge
libtiff 4.1.0 h2733197_1
libuuid 2.32.1 h7f98852_1000 conda-forge
libxml2 2.9.10 h72842e0_4 conda-forge
locket 0.2.0 py_2 conda-forge
lz4-c 1.9.3 h9c3ff4c_0 conda-forge
markupsafe 1.1.1 py38h7b6447c_0 anaconda
msgpack-python 1.0.2 py38h1fd1430_1 conda-forge
multidict 5.1.0 py38h497a2fe_1 conda-forge
ncurses 6.2 h58526e2_4 conda-forge
numpy 1.19.1 py38h30dfecb_0 anaconda
numpy-base 1.19.1 py38h75fe3a5_0 anaconda
olefile 0.46 py_0 anaconda
openssl 1.1.1k h7f98852_0 conda-forge
packaging 20.4 py_0 anaconda
pandas 1.1.3 py38he6710b0_0 anaconda
partd 1.2.0 pyhd8ed1ab_0 conda-forge
pillow 8.0.0 py38h9a89aac_0 anaconda
pip 21.1 pyhd8ed1ab_0 conda-forge
protobuf 3.15.8 py38h709712a_0 conda-forge
psutil 5.8.0 py38h497a2fe_1 conda-forge
pyarrow 4.0.0 pypi_0 pypi
pycparser 2.20 pyh9f0ad1d_2 conda-forge
pyopenssl 20.0.1 pyhd8ed1ab_0 conda-forge
pyparsing 2.4.7 py_0 anaconda
pysocks 1.7.1 py38h578d9bd_3 conda-forge
python 3.8.8 hffdb5ce_0_cpython conda-forge
python-dateutil 2.8.1 py_0 anaconda
python_abi 3.8 1_cp38 conda-forge
pytz 2020.1 py_0 anaconda
pyyaml 5.4.1 py38h497a2fe_0 conda-forge
readline 8.1 h46c0cb4_0 conda-forge
s3fs 2021.4.0 pyhd8ed1ab_0 conda-forge
setuptools 49.6.0 py38h578d9bd_3 conda-forge
six 1.15.0 pyh9f0ad1d_0 conda-forge
skein 0.8.1 py38h578d9bd_1 conda-forge
sortedcontainers 2.3.0 pyhd8ed1ab_0 conda-forge
sqlalchemy 1.4.11 py38h497a2fe_0 conda-forge
sqlite 3.35.5 h74cdb3f_0 conda-forge
tblib 1.7.0 pyhd8ed1ab_0 conda-forge
tk 8.6.10 h21135ba_1 conda-forge
toolz 0.11.1 py_0 conda-forge
tornado 6.1 py38h497a2fe_1 conda-forge
typing-extensions 3.7.4.3 0 conda-forge
typing_extensions 3.7.4.3 py_0 anaconda
urllib3 1.26.4 pyhd8ed1ab_0 conda-forge
wheel 0.36.2 pyhd3deb0d_0 conda-forge
wrapt 1.12.1 py38h497a2fe_3 conda-forge
xz 5.2.5 h516909a_1 conda-forge
yaml 0.2.5 h516909a_0 conda-forge
yarl 1.6.3 py38h497a2fe_1 conda-forge
zict 2.0.0 py_0 conda-forge
zlib 1.2.11 h516909a_1010 conda-forge
zstd 1.4.9 ha95c52a_0 conda-forge
I had to install pyarrow
with pip3
, otherwise I get another exception which does not allow me to read from hdfs or s3.
The YARN LOGS are the following:
21/04/28 23:28:31 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:28:31 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:dask.scheduler.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:787
LogContents:
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://XXXXXXXXXXXX:32843
distributed.scheduler - INFO - dashboard at: :34205
distributed.scheduler - INFO - Receive client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Close client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
End of LogType:dask.scheduler.log
***********************************************************************************
End of LogType:prelaunch.err
******************************************************************************
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container
End of LogType:prelaunch.out
******************************************************************************
Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:application.master.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:2700
LogContents:
21/04/28 23:27:54 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/04/28 23:27:54 INFO skein.ApplicationMaster: Running as user hadoop
21/04/28 23:27:54 INFO conf.Configuration: resource-types.xml not found
21/04/28 23:27:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
21/04/28 23:27:54 INFO skein.ApplicationMaster: Application specification successfully loaded
21/04/28 23:27:55 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8030
21/04/28 23:27:55 INFO skein.ApplicationMaster: gRPC server started at ip-xxxxxxxxx.us-west-1.compute.internal:46405
21/04/28 23:27:56 INFO skein.ApplicationMaster: WebUI server started at ip-xxxxxxxxx.us-west-1.compute.internal:46231
21/04/28 23:27:56 INFO skein.ApplicationMaster: Registering application with resource manager
21/04/28 23:27:56 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:27:56 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
21/04/28 23:27:57 INFO skein.ApplicationMaster: Starting container_1619645048753_0020_01_000002...
21/04/28 23:27:57 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1619645048753_0020_01_000002
21/04/28 23:28:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
21/04/28 23:28:09 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
21/04/28 23:28:09 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Deleted application directory hdfs://ip-XXXXXXXXX.us-west-1.compute.internal:8020/user/hadoop/.skein/application_1619645048753_0020
21/04/28 23:28:09 INFO skein.ApplicationMaster: WebUI server shut down
21/04/28 23:28:09 INFO skein.ApplicationMaster: gRPC server shut down
End of LogType:application.master.log
***************************************************************************************
End of LogType:prelaunch.err
******************************************************************************
Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container
End of LogType:prelaunch.out
******************************************************************************
Does anybody know a workaround for this issue?
Thank you!
Your dask and distributed versions have gone out of sync, 2021.4.0 versus 2021.4.1. Updating dask should fix this. Note that you need to ensure that the exact same versions are also in the environment you are using for YARN.