scala> spark.version
res8: String = 2.2.0
I'm working with a spark Dataframe that contains a column locationID
. I've created a MLlib pipeline to build a linear regression model and it works when I feed it data for a single locationID
. I would now like to create many models for each 'locationID' (there may be a few thousand locationID's in production). I would like to save the model coefficients for each model.
I'm not sure how this can be done in Scala.
My pipeline is defined like this:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql
// Load the regression input data
val mydata = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./inputdata.csv")
// Crate month one hot encoding
val monthIndexer = new StringIndexer()
.setInputCol("month")
.setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
.setInputCol(monthIndexer.getOutputCol)
.setOutputCol("monthVec")
val assembler = new VectorAssembler()
.setInputCols(Array("monthVec","tran_adr"))
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val pipeline = new Pipeline()
.setStages(Array(monthIndexer, monthEncoder, assembler, lr))
// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)
I then am able to pull the model details like this:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]
println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
Now I want to group on a column locationID
found in mydata
and run the pipeline on each partition of the data.
I've tried using using groupby but I can only aggregate.
val grouped = mydata.groupBy("locationID")
I've also tried to pull the unique locationID
as a list and loop through it:
val locationList = mydata.select(mydata("prop_code")).distinct
locationList.foreach { printLn }
I know that spark is not ideal for creating many smaller models, and it best for creating one model on a large set of data, but I've been tasked to do this as a proof of concept.
What is the correct approach for doing something like this in spark?
What is the correct approach for doing something like this in spark?
I'll risk a claim that there is no good approach at all. There many advanced tools which can handle in-core data processing and many task scheduling libraries which can be used to orchestrate independent learning tasks. Spark doesn't offer anything here at all.
It scheduling capabilities are mediocre, so are ML / MLlib tools, and scaling and fault tolerance are no use, when each task is independent.
You could use Spark for generic purpose scheduling (this idea is implemented with sklearn keyed models if you don't mind using Python) but that's it.