Search code examples
scalamachine-learninglinear-regressionapache-flinkflinkml

Extracting weights from FlinkML Multiple Linear Regression


I am running the example multiple linear regression for Flink (0.10-SNAPSHOT). I can't figure out how to extract the weights (e.g. slope and intercept, beta0-beta1, what ever you want to call them). I'm not super seasoned in Scala, that is probably half my problem.

Thanks for any help any one can give.

object Job {
 def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data")

    val survivalLV = survival
      .map{tuple =>
      val list = tuple.productIterator.toList
      val numList = list.map(_.asInstanceOf[String].toDouble)
      LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
    }

    val mlr = MultipleLinearRegression()
      .setStepsize(1.0)
      .setIterations(100)
      .setConvergenceThreshold(0.001)

    mlr.fit(survivalLV) 
    println(mlr.toString())     // This doesn't do anything productive...
    println(mlr.weightsOption)  // Neither does this.

  }
}

Solution

  • The problem is that you've only constructed the Flink job (DAG) which will calculate the weights but it is not yet executed. The easiest way to trigger the execution is to use the collect method which will retrieve the result of the DataSet back to your client.

    mlr.fit(survivalLV)
    
    val weights = mlr.weightsOption match {
      case Some(weights) => weights.collect()
      case None => throw new Exception("Could not calculate the weights.")
    }
    
    println(weights)