I would like to use the Flink-HBase addon to read out data that then serves as an input for the Flink machine learning algorithms, respectively the SVM and MLR. Right now I first write the extracted data to a temporary file and then read it in via the libSVM method, but I guess there should be a more sophisticated way.
Do you have a code snippet or an idea how to do so?
There is no need to write the data to disk and then read it with MLUtils.readLibSVM
. The reason is the following.
MLUtils.readLibSVM
expects a text file where each line is the sparse feature vector with its associated label. It uses the following format to represent the label-feature vector pair:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
Where <feature>
is the index of the subsequent value
in the feature vector. MLUtils.readLibSVM
can read files with this format and converts each line in a LabeledVector
instance. Thus, you obtain a DataSet[LabeledVector]
after having read the a libSVM file. And this is exactly the input format you need for the SVM
and MultipleLinearRegression
predictor.
However, depending on the data format you obtain from HBase, you first have to convert the data into the libSVM
format. Otherwise, MLUtils.readLibSVM
won't be able to read the written file. And if you convert the data, then you can also directly convert your data to a DataSet[LabeledVector]
and use it as the input for Flink's ML algorithms. This avoids an unnecessary disk cycle.
If you obtain from HBase a DataSet[String]
where each string has the libSVM
format (see the specification above), then you can apply a map
operation on the HBase DataSet
with the following map function.
val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
line =>
// remove all comments which start with a '#'
val commentFreeLine = line.takeWhile(_ != '#').trim
if(commentFreeLine.nonEmpty) {
val splits = commentFreeLine.split(' ')
val label = splits.head.toDouble
val sparseFeatures = splits.tail
val coos = sparseFeatures.map {
str =>
val pair = str.split(':')
require(
pair.length == 2,
"Each feature entry has to have the form <feature>:<value>")
// libSVM index is 1-based, but we expect it to be 0-based
val index = pair(0).toInt - 1
val value = pair(1).toDouble
(index, value)
}
Some((label, coos))
} else {
None
}
// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
labelCOO =>
labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))
val labeledVectors: DataSet[LabeledVector] =
labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
var dimension = 0
override def open(configuration: Configuration): Unit = {
dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
}
override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}
}}.withBroadcastSet(dimensionDS, DIMENSION)
This will convert your libSVM format data into a data set of LabeledVectors
.