I have been trying to list all the Spark dataframes from Parquet files in directories except metadata directory. The structure of directories looks like this:
dumped_data/
- time=19424145
- time=19424146
- time=19424147
- _spark_metadata
The main goal is to avoid reading data from _spark_metadata directory. I have created a solution but it constantly returns empty values for some reason. What could be the reason of it?
Here is the solution:
val dirNamesRegex: Regex = s"\\_spark\\_metadata*".r
def transformDf: Option[DataFrame] = {
val filesDf = listPath(new Path(feedPath))(fsConfig)
.map(_.getName)
.filter(name => !dirNamesRegex.pattern.matcher(name).matches)
.flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
if (!filesDf.isEmpty)
Some(filesDf.reduce(_ union _))
else None
}
listPath - custom method for listing data files in hdfs. feedSchema is of StructType
Without if on Some and None I get this exception:
java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:137)
at scala.collection.immutable.List.reduceLeft(List.scala:84)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:208)
at scala.collection.AbstractTraversable.reduce(Traversable.scala:104)
In your code you have 3 problems:
==
operator instead of regex matching. You know concrete name of directory to filter, just use filtering by name.filesDf
is something like Traversable[DataFrame]
. If you want reduce it safety even this collection is empty you can use reduceLeftOption
instead of reduce
.transformDf
method you are trying to filter directory names and reading data using spark, it can be too heavy to debug with spark also. I would advise you divide your logic into 2 different methods: first - read directories and filter them, second - read data and union
them into one general DataFrame
.I propose such code samples:
case without dividing logic:
def transformDf: Option[DataFrame] = {
listPath(new Path(feedPath))(fsConfig)
.map(_.getName)
.filter(name => name == "_spark_metadata")
.flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
.reduceLeftOption(_ union _)
}
case with divided logic:
def getFilteredPaths: List[String] =
listPath(new Path(feedPath))(fsConfig)
.map(_.getName)
.filter(name => name == "_spark_metadata")
def transformDf: Option[DataFrame] = {
getFilteredPaths
.flatMap(path => sparkSession.parquet(Some(feedSchema))(path))
.reduceLeftOption(_ union _)
}
In second way you can write some light-weight unit-tests for debug your paths extraction and when you will have correct paths you can easily read data from directories and union them.