In my Scala/Spark application, I create DataFrame. I plan to use this Dataframe several times throughout the program. For that's why I decided to used .cache()
method for that DataFrame. As you can see inside the loop I filter DataFrame several times with different values. For some reason .count()
method returns me the always the same result. In fact, it must return two different count values. Also, I notice strange behavior in Mesos. It feels like the .cache()
method is not being executed. After creating the DataFrame, the program goes to this part of code if (!df.head(1).isEmpty)
and performs it for a very long time. I assumed that the caching process would run for a long time, and the other processes would use this cache and run quickly. What do you think is the problem?
import org.apache.spark.sql.DataFrame
var df: DataFrame = spark
.read
.option("delimiter", "|")
.csv("/path_to_the_files/")
.filter(col("col5").isin("XXX", "YYY", "ZZZ"))
df.cache()
var array1 = Array("111", "222")
var array2 = Array("333")
var storage = Array(array1, array2)
if (!df.head(1).isEmpty) {
for (item <- storage) {
df.filter(
col("col1").isin(item:_*)
)
println("count: " + df.count())
}
}
In fact, it must return two different count values.
Why? You are calling it on the same df
. Maybe you meant something like
val df1 = df.filter(...)
println("count: " + df1.count())
I assumed that the caching process would run for a long time, and the other processes would use this cache and run quickly.
It does, but only when the first action which depends on this dataframe is executed, and head
is that action. So you should expect exactly
the program goes to this part of code
if (!df.head(1).isEmpty)
and performs it for a very long time
Without caching, you'd also get the same time for both df.count()
calls, unless Spark detects it and enables caching on its own.