Search code examples
pythonpysparkparquetfeature-storemlrun

Read parquet in MLRun, "Unable to infer schema for Parquet. It must be specified manually."


I got this issue, when I ingested/wrote data to FeatureSet (part of MLRun FeatureStore) and than I read the data via PySpark (it seems as invalid parquet). See exception:

AnalysisException                         Traceback (most recent call last)
<ipython-input-8-a8c688f9ceb5> in <module>
----> 1 newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
      2 newDF1.show()

/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
    299                        int96RebaseMode=int96RebaseMode)
    300 
--> 301         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    302 
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,

/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1320         answer = self.gateway_client.send_command(command)
   1321         return_value = get_return_value(
-> 1322             answer, self.gateway_client, self.target_id, self.name)
   1323 
   1324         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Unable to infer schema for Parquet. It must be specified manually.

See the key part of source code (which generated the exception):

...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False),NoSqlTarget(name="s2")],with_defaults=False)
feature_set1.save()
fstore.ingest(f"store://feature-sets/{project_name}/FS-ingest", sparkDF,spark_context=spark, overwrite=True)
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()

Did you see similar issue?

NOTE: Parquet path contains parquet files (all files are valid), it means the ingestion was succesful.


Solution

  • The source code (usage of parquet) contains mistake. The FeatureSet used two targets, online and offline store and in this case, the spark.read.parquet affected also online storage, where is different format than parquet. I see two possible solutions.

    1. Update parquet read part

    It is easy way, how to solve the issue. Simple, extend/add current path /parquet, see updated code:

    ...
    newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
    newDF1.show()
    ...
    

    2. Remove online/NoSql target

    It is about update of FeatureSet definition (remove NoSqlTarget(name="s2")) and keep spark.read.parquet part see updated code:

    ...
    feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
    feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False)],with_defaults=False)
    feature_set1.save()
    
    newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
    newDF1.show()
    ...
    

    BTW: The same solution is valid also for this different exception, which contains more exact issue description (with view to the different path to online and offline store):

    Py4JJavaError: An error occurred while calling o3233.parquet.
    : java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
        v3io://projects/spark-parquet-test2/featurestore/FS-ingest/nosql/sets/FS-ingest/1674747966078_84
        v3io://projects/spark-parquet-test2/featurestore/FS-ingest/parquet/sets/FS-ingest/1674747966078_84
    
    If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
        at scala.Predef$.assert(Predef.scala:223)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:178)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:110)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:158)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:73)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)