Search code examples
dagster

Reduce a partitioned asset to a single data frame


I have a software defined asset that compares a (very) large set of partitions to a single small data frame. I've made a toy explainer that shows the situation:

partitions = dagster.StaticPartitionsDefinition(["a", "b"])

@dagster.asset(partitions_def=partitions)
def large_dataframes(context):
    return pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))

@dagster.asset
def small_dataframe(context):
    return pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('DEFG'))

@dagster.asset(partitions_def=partitions)
def one_df_filtered_by_the_other(context, large_dataframes, small_dataframe):
    return large_dataframes.join(small_dataframe, rsuffix=".")

which is great; works a treat. Clear and easy to read (in general: loving dagster).

However, now the output of the third asset is a partitioned data frame, even though in practice many of the partitions are going to be empty and the non empty ones are going to be teensy.

Ideally, I'd like to "reduce" the partitions to a single data frame, which itself becomes a new asset I can depend on downstream.

The way I'm doing it, though, feels like a hack and although it seems to be working, I can't shake the sense that I'm missing something. What I've noticed is that if you forget the partitions_def the asset shows up as a dictionary of dataframes. So I can "reduce" them like this:

@dagster.asset
def reduce_to_unpartitioned(context,one_df_filtered_by_the_other):
    return pd.concat(one_df_filtered_by_the_other.values())

I'm quite nervous though when I get the real thing into production, and the number of partitions is in the hundreds or thousands, whether or not this will behave as I hope, mostly because I've not been able to find an "official" approach in the docs.

Does the above look right? Is there a better way?


Solution

  • Your proposed solution is correct as of v1.3.4 of dagster. The dictionary you're seeing is the behavior of the default UPathIOManager when loading a partitioned input asset in an unpartitioned run. This can also be customized by providing your own IO manager.

    I think our docs are a little weak on partitioned IO management, but partitions generally are a heavy area of development so our docs will improve here in the coming months.