Search code examples
scalaapache-sparknon-deterministic

Non deterministic fields getting recalculated between showing, counting, and saving a dataframe


We have a uuid udf :

import java.util.UUID
val idUdf = udf(() => idgen.incrementAndGet.toString + "_" + UUID.randomUUID)
spark.udf.register("idgen", idUdf)

An issue being faced is that when running count, or show or write each of those end up with a different value of the udf result.

    df.count()             // generates a UUID for each row
    df.show()              // regenerates a UUID for each row
    df.write.parquet(path) // .. you get the picture ..

What approaches might be taken to retain a single uuid result for a given row? The first thought would be to invoke a remote Key-Value store using some unique combination of other stable fields within each column. That is of course expensive both due to the lookup-per-row and the configuration and maintenance of the remote KV Store. Are there other mechanisms to achieve stability for these unique ID columns?


Solution

  • Just define your udf as nondeterministic by calling:

    val idUdf = udf(() => idgen.incrementAndGet.toString + "_" + UUID.randomUUID)
        .asNondeterministic()
    

    This will evaluate your udf just once and keep the result in the RDD