Search code examples
pysparkaws-glue

Is there any method to concatenate/unite DynamicFrame objects in AWS GLue?


I've tried to concatenate a set of DynamicFrame objects in order to create a composite bigger one within Glue Job. According to Glue docs there are only a few methods available for DynamicFrameCollection class and none of them allows this kind of operation. Have anyone tried to perform something similar?

A collection is an indexed by keys structure and looks like the following within gluecontext, where each datasource object is a parsed table in parquet format.

df_dic = {
    "datasource0": datasource0,
    "datasource1": datasource1,
    "datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)

Here each DynamicFrame is read using the read using create_dynamic_frame.from_options method.

datasource0 = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [
            f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
        ]
    },
    format="parquet",
    # format_options={},
    transformation_ctx="datasource0",
)

Solution

  • I ended up using workaround, it can be achieved on the low-level API using DynamicFrame, without converting into spark DataFrame, in an iterative fashion using mergeDynamicFrame method.

    def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
        """[summary]
    
        Parameters
        ----------
        prefix : str
            [description]
        store_module : int
            [description]
    
        Returns
        -------
        DynamicFrame
            [description]
        """
        logger.info(f"Fetching DynamicFrame: {timestamp}")
        datasource = glueContext.create_dynamic_frame.from_options(
            connection_type="s3",
            connection_options={
                "paths": [
                    f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
                ]
            },
            format="parquet",
            # format_options={},
            transformation_ctx="datasource",
        )
        return datasource
    
    datasource0 = CustomTransform(list_tables[0], store_module)
    # Iterates over other DynamicFrames listed as `list_tables`
    for idx in range(1, len(list_tables)):
        datasourcex = CustomTransform(list_tables[idx], store_module)
        swp_datasource = datasource0.mergeDynamicFrame(
            stage_dynamic_frame=datasourcex,
            primary_keys=["id"],
            transformation_ctx="swp_datasource",
        )
        datasource0 = swp_datasource