I'm trying to specify the folder permissions then using dask.dataframe.to_parquet()
with the partition_on
argument, as this argument creates a folder for each group in the provided DataFrame column.
I have tried the following code without any luck. Here, I set the mode of the "main" folder passed to to_parquet
such that the folder permissions are drwxrws---
(shown by ls -la), but the created partition_on
folders have folder permissions drwxr-sr-x
:
from pathlib import Path
@dask.delayed
def mkdir(folder_path):
if not folder_path.exists():
folder_path.mkdir()
folder_path.chmod(mode=0o2770)
return folder_path
path = Path('/scratch/...folder_path.../dataset/')
path_delayed = mkdir(path)
dask.delayed(ddf_postext.to_parquet)(
path_delayed, compression='brotli', write_index=True, append=False,
partition_on=['p_k10dato_YYYY_MM'], storage_options={'mode': 0o2770},
engine='fastparquet')
NOTE: the task is dask.delayed as I need to save the parquet files on the same remote machine as where the Dask scheduler/workers are located.
I want to set the folder permissions as my Dask scheduler/workers are using a Unix user in the same group as my user, thus enabling my user to e.g. delete the folders created by the Dask workers, if the mode and group permission (27xx) is configured.
In details: The mode and permissions of the folder are set to "2770". The SGID "2" ensures that new files in the folder will be created with the same group permissions as the folder itself. The umask "770" ensures that both the user and the group members have read, write, and execute permissions to the folder and its files, whereas all other users have no permissions.
As @mdurant, point out, first save the files normally, and then change permissions after. The new code below solved my folder permission problem:
import os
@dask.delayed
def save_parquet_files(ddf, folder_path, to_parquet_kwargs={}):
"""Save Dask DataFrame to parquests with shared group permisions."""
save_data = ddf.to_parquet(folder_path, **to_parquet_kwargs)
# set folder permissions as the partition_on argument creates a folder for
# each group in the provided DataFrame column.
if 'partition_on' in to_parquet_kwargs.keys():
for root, dirs, _ in os.walk(folder_path):
for momo in dirs:
os.chmod(os.path.join(root, momo), 0o2770)
save_parquet_files(
ddf_postext, path_delayed, to_parquet_kwargs={
'compression': 'brotli',
'append': False,
'partition_on': ['p_k10dato_YYYY_MM'],
'engine': 'fastparquet'}).compute()
Correct,
storage_options={'mode': 0o2770}
has no effect: this is a global value for the file-system, and all files are opened in rb
, ab
or wb
mode with default permissions.
I would recommend saving your files normally, and then making a function to set the permissions after
def remote_chmod(path):
for root, dirs, files in os.walk(path):
for momo in dirs:
os.chmod(os.path.join(root, momo), 0o2770)
for momo in files:
os.chmode(os.path.join(root, momo), 0o2770)
client.submit(remote_chmod)
Note that the filesystem fsspec.implementations.dask.DaskWorkerFileSystem
contains operations for working with remote as seen by a dask worker - but it doesn't have chmod/own implemented. Indeed, it should be rewritten so that the set of methods available depends on the class of remote filesystem in question.