Search code examples
pythondatabricksworkflowassetbundledatabricks-asset-bundle

how to use pydabs to create and configure workflows/pipelines?


At work we wanted to have more python configurability of our databricks workflows as well as be able to version control them via our Azure Repos. So we moved away from using the standard UI and adopted pyjaws which is a short term solution until Databricks releases a fully supported version.

Upon using pyjaws we found a number of limitations and now its time to move onto pydabs. The issue is, I have no clue how to switch over to pydabs.

I wanted to create a proof of concept first before moving our existing infrastructure over, and would be grateful for any pointers on the lines of code I will need and the sort of code blocks required.

https://pypi.org/project/databricks-pydabs/


Solution

  • This is a sample script I built long back. This uses an existing interactive cluster.

    import os, time
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.compute import (
        ClusterSpec,
        DataSecurityMode,
        Library,
        ResultType,
    )
    from databricks.sdk.service import jobs
    
    w = WorkspaceClient()
    
    notebook_path = f"/Users/{w.current_user.me().user_name}/workspace client"
    
    cluster_id = (
        w.clusters.ensure_cluster_is_running("0828-152531-ze9eyjxc")
        or "0828-152531-ze9eyjxc"
    )
    
    print(cluster_id)
    
    created_job = w.jobs.create(
        name=f"gc-sdk-test",
        tasks=[
            jobs.Task(
                description="GC SDK Demo",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0,
            )
        ],
    )
    
    run_by_id = w.jobs.run_now(job_id=created_job.job_id).result()
    
    print(run_by_id)
    
    # cleanup
    w.jobs.delete(job_id=created_job.job_id)
    

    If you need to build a new cluster try this

    c = w.clusters.create_and_wait(
        policy_id="your_job_policy_id",
        cluster_name="Small-Shared-UC-SDK",
        spark_version="14.1.x-scala2.12",
        node_type_id="i3.xlarge",
        data_security_mode=DataSecurityMode.USER_ISOLATION,
        spark_env_vars={"SOMEVAR": "Value1", "SOMEVAR2": "2"},
        spark_conf={
            "spark.name": "world",
            "spark.hadoop.datanucleus.autoCreateSchema": "true",
        },
    )
    

    and then read the

    cluster_id = c.cluster_id