I have a list of locations like (100, 101, 205, 310, etc.). I would like to store this list of locations in a location external to the Spark jobs that use this list as part of their query. Then, if the list is amended in any way, each Spark Job will not have to be touched individually, there will be a global place to edit the list and each separate job will pull from that global location.
For example:
Where location in (100, 101, 205, 310, etc.)
would be replaced by
Where location in ('path to global list of locations')
I have tried creating a separate .conf file to store these types of values/lists, but am unsure how to tie the new .conf file to all of the job.conf files.
So, this SO-question
Read files sent with spark-submit by the driver shows a few of the discussions around this topic. Not well understood, when I also talk to the data engineers I work with. E.g. local file system files with --files
, .conf
issues and all aspects of whether Cluster
vs. Client
mode and directories to be loaded to.
But if you look at the simple Seq of permitted filtering values approach, e.g. from https://sparkbyexamples.com/spark/spark-isin-is-not-in-operator-example/
val data = Seq(("James","Java"),("Michael","Spark"),("Robert","Python"))
import spark.implicits._
val df = data.toDF("name","language")
//Using isin() function to check value in list of values
val listValues = Seq("Java","Scala")
df.filter(df("language").isin(listValues:_*)).show()
//+-----+--------+
//| name|language|
//+-----+--------+
//|James| Java|
//+-----+--------+
my understanding is you can gen a Seq
for isin
comparison via a --files
approach with appropriate coding. Sure, but not what I am presenting.
These days I use the Databricks notebooks as I am tired of re-installing Hive Metastore and such, so I do not check the --files approach, and simply use the distributed file approach for the filtering values, as it is what you learn initially and easier:
I start from DS, with an RDD I get a problem I am not sure of, but RDD's are legacy, so not relevant. Code:
// You can use DF, DS filtering or SQL, but it does not work how you state in question. Substituting the SQL with Spark SQL can be done, but why not
// just use the below approach. As we have DF or SQL on tempview, that amount to the same thing.
// Can be done with or without broadcast. Here we do broadcast.
// 1. Get the filter location criteria. Note the format.
val ds = spark.read.textFile("/FileStore/tables/loc_include.txt")
ds.show()
// 2. Generate a Seq via Spark Driver collect and pass to the Workers / Executors.
val loc_include = ds.flatMap(_.split(",")).withColumn("valueInt",col("value").cast("int")).select("valueInt").distinct.map(_.getInt(0)).collect.toSeq
val broadcast_loc_include = sc.broadcast(loc_include)
// 3. Our data.
val df = Seq( (1, 2, 320), (2, 2, 300), (7, 6, 400), (1, 3, 650)).toDF("col1", "col2", "col3")
// 4a. Without broadcast.
val df2 = df.filter(df("col3").isin(loc_include:_*))
df2.show()
// 4b. With broadcast.
val df3 = df.filter(df("col3").isin(broadcast_loc_include.value:_*))
df3.show()
Input and results:
+-----------+
| value|
+-----------+
|100,200,300|
| 400|
+-----------+
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| 2| 300|
| 7| 6| 400|
+----+----+----+
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| 2| 300|
| 7| 6| 400|
+----+----+----+
In any event it does not work how you want, sort of include from a file.
Input was simulated.
Moreover, what's the purpose and usecase of --files in spark-submit? shows you some perspective as well.