Search code examples
python-3.xapache-sparkparquetdatabrickspyarrow

Are Apache Spark 2.0 parquet files incompatible with Apache Arrow?


The problem

I have written an Apache Spark DataFrame as a parquet file for a deep learning application in a Python environment ; I am currently experiencing issues in implementing basic examples of both petastorm (following this notebook) and horovod frameworks, in reading the aforementioned file namely. The DataFrame has the following type : DataFrame[features: array<float>, next: int, weight: int] (much like in DataBricks' notebook, I had features be a VectorUDT, which I converted to an array).
In both cases, Apache Arrow throws an ArrowIOError : Invalid parquet file. Corrupt footer. error.

What I found until now

I discovered in this question and in this PR that as of version 2.0, Spark doesn't write _metadata or _common_metadata files, unless spark.hadoop.parquet.enable.summary-metadata is set to true in Spark's configuration ; those files are indeed missing.
I thus tried rewriting my DataFrame with this environment, still no _common_metadata file. What also works is to explicitely pass a schema to petastorm when constructing a reader (passing schema_fields to make_batch_reader for instance ; which is a problem with horovod as there is no such parameter in horovod.spark.keras.KerasEstimator's constructor).

How would I be able, if at all possible, to either make Spark output those files, or in Arrow to infer the schema, just like Spark seems to be doing ?

Minimal example with horovod

# Saving df
print(spark.config.get('spark.hadoop.parquet.enable.summary-metadata')) # outputs 'true'
df.repartition(10).write.mode('overwrite').parquet(path)

# ...

# Training
import horovod.spark.keras as hvd
from horovod.spark.common.store import Store

model = build_model()
opti = Adadelta(learning_rate=0.015)
loss='sparse_categorical_crossentropy'
store = Store().create(prefix_path=prefix_path,
                       train_path=train_path,
                       val_path=val_path)
keras_estimator = hvd.KerasEstimator(
    num_proc=16,
    store=store,
    model=model,
    optimizer=opti,
    loss=loss,
    feature_cols=['features'],
    label_cols=['next'],
    batch_size=auto_steps_per_epoch,
    epochs=auto_nb_epochs,
    sample_weight_col='weight'
)

keras_model = keras_estimator.fit_on_parquet() # Fails here with ArrowIOError

Solution

  • The problem is solved in pyarrow 0.14+ (issues.apache.org/jira/browse/ARROW-4723), be sure to install the updated version with pip (up until Databricks Runtime 6.5, the included version is 0.13).
    Thanks to @joris' comment for pointing this out.