Search code examples
machine-learninghbaseapache-flink

Flink HBase input for machine learning algorithms


I would like to use the Flink-HBase addon to read out data that then serves as an input for the Flink machine learning algorithms, respectively the SVM and MLR. Right now I first write the extracted data to a temporary file and then read it in via the libSVM method, but I guess there should be a more sophisticated way.

Do you have a code snippet or an idea how to do so?


Solution

  • There is no need to write the data to disk and then read it with MLUtils.readLibSVM. The reason is the following.

    MLUtils.readLibSVM expects a text file where each line is the sparse feature vector with its associated label. It uses the following format to represent the label-feature vector pair:

    <line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
    

    Where <feature> is the index of the subsequent value in the feature vector. MLUtils.readLibSVM can read files with this format and converts each line in a LabeledVector instance. Thus, you obtain a DataSet[LabeledVector] after having read the a libSVM file. And this is exactly the input format you need for the SVM and MultipleLinearRegression predictor.

    However, depending on the data format you obtain from HBase, you first have to convert the data into the libSVM format. Otherwise, MLUtils.readLibSVM won't be able to read the written file. And if you convert the data, then you can also directly convert your data to a DataSet[LabeledVector] and use it as the input for Flink's ML algorithms. This avoids an unnecessary disk cycle.

    If you obtain from HBase a DataSet[String] where each string has the libSVM format (see the specification above), then you can apply a map operation on the HBase DataSet with the following map function.

    val hbaseInput: DataSet[String] = ...
    val labelCOODS = hbaseInput.flatMap {
      line =>
        // remove all comments which start with a '#'
        val commentFreeLine = line.takeWhile(_ != '#').trim
    
        if(commentFreeLine.nonEmpty) {
          val splits = commentFreeLine.split(' ')
          val label = splits.head.toDouble
          val sparseFeatures = splits.tail
          val coos = sparseFeatures.map {
            str =>
              val pair = str.split(':')
              require(
                pair.length == 2, 
                "Each feature entry has to have the form <feature>:<value>")
    
              // libSVM index is 1-based, but we expect it to be 0-based
              val index = pair(0).toInt - 1
              val value = pair(1).toDouble
    
              (index, value)
          }
    
          Some((label, coos))
        } else {
          None
        }
    
    // Calculate maximum dimension of vectors
    val dimensionDS = labelCOODS.map {
      labelCOO =>
        labelCOO._2.map( _._1 + 1 ).max
    }.reduce(scala.math.max(_, _))
    
    val labeledVectors: DataSet[LabeledVector] = 
      labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
      var dimension = 0
    
      override def open(configuration: Configuration): Unit = {
        dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
      }
    
      override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
        new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
      }
    }}.withBroadcastSet(dimensionDS, DIMENSION)
    

    This will convert your libSVM format data into a data set of LabeledVectors.