Search code examples
pythonconcurrencyparallel-processingdata-engineeringdlt

Parallelising dlthub Rest API pipeline


I am trying to speed up the following dlt (dlthub) pipeline via parallelisation as shown in the documentation here: https://dlthub.com/docs/reference/performance#parallelism

Here is the original (NOT parallelised) code which works

source = rest_api_source({
    "client": {
        "base_url": "https://www.filmweb.no/",
    },
    "resources": [
        {
            "name": f"movie_{MOVIE_ID}",
            "table_name": "movies",
            "endpoint": {
                "path": "_next/data/{build_id}/film/{movie_id}.json",
                "params": {
                    "movie_id": MOVIE_ID,
                    "build_id": BUILD_ID,
                    "edi": MOVIE_ID,
                },
                "data_selector": "pageProps.cmsDocument",
            },
            "write_disposition": "replace",
        }
        for MOVIE_ID in MOVIE_IDS
    ],
})

pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
load_info = pipeline.run(source)

I cannot work out how to actually parallelise this. The following code block...

@dlt.resource(parallelized=True)
def movies(movie_ids):
    for MOVIE_ID in movie_ids:
        yield {
            "name": f"movie_{MOVIE_ID}",
            "table_name": "movies",
            "endpoint": {
                "path": "_next/data/{build_id}/film/{movie_id}.json",
                "params": {
                    "movie_id": MOVIE_ID,
                    "build_id": BUILD_ID,
                    "edi": MOVIE_ID,
                },
                "data_selector": "pageProps.cmsDocument",
            },
            "write_disposition": "replace",
        }

@dlt.source
def movies_source(movie_ids):
    return [rest_api_source({
        "client": {
            "base_url": "https://www.filmweb.no/",
        },
        "resources": [movies(movie_ids)]
    })]

pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
pipeline.run(movies_source(MOVIE_IDS))

...gives me the error:

ResourceNameMissing: Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument.
        Please note that for resources created from functions or generators, the name is the function name by default.

I have tried a lot of things e.g. taking the rest_api_source out of a list within the movies_source function; explicitly specifying the name within the @dlt.resource wrapper; taking the name entry out of the yielded dict within the movies resource. None of this is fixing anything.


Solution

  • In the dlt REST API declarative object you can define resources passing the arguments that you can use in a normal dlt resource. In your case parallelized=True.

    So, your config object would be:

    source: RESTAPIConfig = rest_api_source({
        "client": {
            "base_url": "https://www.filmweb.no/",
        },
        "resources": [
            {
                "name": f"movie_{MOVIE_ID}",
                "table_name": "movies",
                "endpoint": {
                    "path": "_next/data/{build_id}/film/{movie_id}.json",
                    "params": {
                        "movie_id": MOVIE_ID,
                        "build_id": BUILD_ID,
                        "edi": MOVIE_ID,
                    },
                    "data_selector": "pageProps.cmsDocument",
                },
                "write_disposition": "replace",
            }
            for MOVIE_ID in MOVIE_IDS
        ],
    })
    

    I suggest to use the type hint RESTAPIConfig to have your IDE suggesting you which are the available arguments for the REST API config object.