Search code examples
apache-sparkpysparkavroemrspark-avro

In Spark, How to convert multiple dataframes into an avro?


I have a Spark job that processes some data into several individual dataframes. I store these dataframes in a list, i.e. dataframes[]. Eventually, I'd like to combine these dataframes into a hierarchical format and write the output in avro. The avro schema is something like this:

{
    "name": "mydata",
    "type": "record",
    "fields": [
        {"name": "data", "type": {
            "type": "array", "items": {
                "name": "actualData", "type": "record", "fields": [
                    {"name": "metadata1", "type": "int"},
                    {"name": "metadata2", "type": "string"},
                    {"name": "dataframe", "type": {
                        "type": "array", "items": {
                            "name": "dataframeRecord", "type": "record", "fields": [
                                {"name": "field1", "type": "int"},
                                {"name": "field2", "type": "int"},
                                {"name": "field3", "type": ["string", "null"]}]
                            }
                        }
                    }]
                }
            }
        }
    ]
}

As can be inferred, each dataframe has three fields, field1, field2, and field3, which I'd like to write as an array in the avro file. There's also some metadata associated with each dataframe.

My current approach is to, once these data are processed, write the dataframes to S3, and then use a separate program to pull these data from S3, use the avro library to write an avro file, and then upload it to S3 again.

However, as the amount of data grows, this is becoming very slow. I've looked into the databricks library to write avro files directly, but I don't know how I can combine the dataframes together in memory, or how the databricks library could determine the schema I'm using.

Is there an idiomatic way to do this in Spark?

P.S. I'm using EMR with Spark 2.0.0 in Python.


Solution

  • I've figured out a solution, specific to PySpark:

    With each data frame, I used .collect() to get a list of Rows. For each of the Row objects, I called asDict() to get a dictionary. From there, I was able to construct a list of dictionaries with a simple loop. Once I have this list of dictionaries, the data exits Spark and enters pure Python territory, and are "easier" to handle (but less efficient).

    Alternatively, had I chosen Scala over Python, I may have been able to convert the data frame into a dataset, which seems to provide a handful of methods to perform the operations that I need, but that's another story altogether.