Search code examples
pythondaskdistributeddask-distributeddask-kubernetes

How to Send .pem file to Dask Cluster?


I have a dask expression as follows where I'm trying to run a sqlalchemy query in a distributed way. However, it references a .pem key file that's inputted in the connect_args parameter. How do I upload this key file into the dask cluster/workers such that it will allow me to run this sqlalchemy query?

def execute_query(q):
    conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                               connect_args={'protocol': 'https',
                                             'requests_kwargs': {'verify': key}})
    return pd.read_sql(q, conn)

df = dd.from_delayed([
    delayed(execute_query)(q) for q in queries])

I tried using client.upload_file to send the local file to the cluster, but it complains that it's unable to find the path to the .pem key

OSError: Could not find a suitable TLS CA certificate bundle, invalid path: hdsj1ptc001.pem

Solution

  • While Dask can handle some maneuvering of files for you (see client.upload_file), you should use your own methods for distributing sensitive files such as credentials into specific locations on the worker file-system. Options include scp, kubernetes secrets and many other methods.

    If you are certain of the security of your cluster, you could include the key file in the arguments to your function and either write it to a file in the functions (see below) or, if the call allows it, pass the bytes directly.

    def execute_query(q, key):
        if not os.path.exists(keyfile):   # if the data needs to be in a file
            open(keyfile, 'wb').write(key)
        conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                                   connect_args={'protocol': 'https',
                                                 'requests_kwargs': {'verify': keyfile}})
        return pd.read_sql(q, conn)
    
    key = dask.delayed(open('keyfile.pem', 'rb').read())
    df = dd.from_delayed([
        delayed(execute_query)(q, key) for q in queries])