Search code examples
pythonpysparkparquetmlrun

PySpark, parquet "AnalysisException: Unable to infer schema for Parquet"


I got this issue, when I read data from parquet via PySpark in MLRun (it seems as invalid parquet). See exception:

-------------------------------------------------
AnalysisException                   Traceback (most recent call last)
<ipython-input-19-c76c691629bf> in <module>
----> 1 new_DF=spark.read.parquet('v3io:///projects/risk/FeatureStore/pbr/parquet/') 
      2 new_DF.show()
      
/spark/python/pyspark/sql/readwriter.py in parquet (self, *paths, **options)
    299         int96RebaseMode=int96RebaseMode)
    300
--> 301         return self._df (self._jreader.parquet (_to_seq(self._spark._sc, paths)))
    302
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
   
   
/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in _call_(self, *args)
   1320         answer = self.gateway_client.send_command(command)
   1321         return_value = get_return_value(
-> 1322             answer, self.gateway_client, self.target_id, self.name)
   1323
   1324         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115         # Hide where the exception came from that shows a non-Pythonic
    116         # JVM exception message.
--> 117         raise converted from None
    118     else:
    119         raise

AnalysisException: Unable to infer schema for Parquet. It must be specified manually.

See the key part of source code (which generated the exception):

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Test') \
    .config("spark.executor.memory", "9g") \
    .config("spark.executor.cores", "3") \
    .config('spark.cores.max', 12) \
    .getOrCreate()

new_DF=spark.read.parquet("v3io:///projects/risk/FeatureStore/pbr/parquet/")
new_DF.show()

Did you get similar issue?


Solution

  • The issue was in problematic parquet files.

    When I transferred these files from parquet to CSV format, you can see these different headers:

    file> a09*.parquet
    ...
    pbr_cnt_3M_up(Int64)    
    pbr_cnt_3m_up(Int64)    
    pbr_kept_rate_3M(Double)    
    pbr_kept_rate_3m(Double)    
    ...
    
    file> b87*.parquet
    ...
    pbr_cnt_3m_up(Int64)    
    pbr_kept_rate_3M(Double)    
    ...
    

    Issue was that we had similar column names with differences in lowercase and uppercase. The PySpark was not able to unify these differences.

    Solution was, recreate these parquet files and remove these column name differences and use unique column names (only with lower cases).