It comes to a situation we need a train a bunch of data(about 22GiB), I make a test with two methods to generate random data and try to train it with Dask, however, the data generated by Numpy would raise an exception(msgpack: bytes object is too large) while the Dask.array one works. Did anybody know why?
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask import array as da
import numpy as np
import xgboost as xgb
import time
def main(client):
regressor = None
pre = None
n=3000
m=1000000
# numpy generated data will raise an exception
X = np.random.random((m, n))
y = np.random.random((m, 1))
X = da.from_array(X, chunks=(1000, n))
y = da.from_array(y, chunks=(1000, 1))
# data generated by dask.array works well
# X = da.random.random(size=(m, n), chunks=(1000, n))
# y = da.random.random(size=(m, 1), chunks=(1000, 1))
dtrain = xgb.dask.DaskDMatrix(client, X, y)
del X
del y
params = {'tree_method':'gpu_hist'}
watchlist = [(dtrain, 'train')]
start = time.time()
bst = xgb.dask.train(client, params, dtrain, num_boost_round=100, evals=watchlist)
print('consume:', time.time() - start)
if __name__ == '__main__':
with LocalCUDACluster(n_workers=4, device_memory_limit='12 GiB') as cluster:
with Client(cluster) as client:
main(client)
After making a few tests, I found out the reason, The da.random.random is a delay function as well(so it pass worker only the definition of random), in our situation, the msgpack limit the data size(4GiB) pass to each worker, so, in general, it wouldn't work for data size more than 4GiB directly communicate with Dask XGBoost(BTW, we can switch to parquet data and read it as dash.dataframe chunk data to bypass the limitation of msgpack)
the following commands proved my guess.