Search code examples
apache-sparkapache-spark-sqlk-meansapache-spark-ml

Cluster centers have different dimensionality in Spark MLlib


I use Spark 2.0.2. I have data which is partitioned by day. I want to cluster the different partitions independent from each other and than compare the cluster centers (calculate distance between them) to see, how the clusters would change over time.

I do exactly the same preprocessing (scaling, one hot encoding, etc.) for every partition. I use a predefined pipeline for this which works perfectly in "normal" learning and prediction contexts. But when I want to calculate the distance between the cluster centers, the corresponding vectors of the different partitions have a different size (different dimensionality).

Some code snippets:

The preprocessing pipeline is built like this:

val protoIndexer = new StringIndexer().setInputCol("protocol").setOutputCol("protocolIndexed").setHandleInvalid("skip")
val serviceIndexer = new StringIndexer().setInputCol("service").setOutputCol("serviceIndexed").setHandleInvalid("skip")
val directionIndexer = new StringIndexer().setInputCol("direction").setOutputCol("directionIndexed").setHandleInvalid("skip")

val protoEncoder = new OneHotEncoder().setInputCol("protocolIndexed").setOutputCol("protocolEncoded")
val serviceEncoder = new OneHotEncoder().setInputCol("serviceIndexed").setOutputCol("serviceEncoded")
val directionEncoder = new OneHotEncoder().setInputCol("directionIndexed").setOutputCol("directionEncoded")

val scaleAssembler = new VectorAssembler().setInputCols(Array("duration", "bytes", "packets", "tos", "host_count", "srv_count")).setOutputCol("scalableFeatures")
val scaler = new StandardScaler().setInputCol("scalableFeatures").setOutputCol("scaledFeatures")
val featureAssembler = new VectorAssembler().setInputCols(Array("scaledFeatures", "protocolEncoded", "urgent", "ack", "psh", "rst", "syn", "fin", "serviceEncoded", "directionEncoded")).setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(protoIndexer, protoEncoder, serviceIndexer, serviceEncoder, directionIndexer, directionEncoder, scaleAssembler, scaler, featureAssembler))
pipeline.write.overwrite().save(config.getString("pipeline"))

Define k-means, load predefined preprocessing-pipeline, add k-means to the pipeline:

val kmeans = new KMeans().setK(40).setTol(1.0e-6).setFeaturesCol("features")
val pipelineStages = Pipeline.load(config.getString("pipeline")).getStages
val pipeline = new Pipeline().setStages(pipelineStages ++ Array(kmeans))

Load data partitions, calculate features, fit pipeline, get k-means model and show size of the first cluster center as example:

(1 to 7 by 1).map { day =>
  val data = sparkContext.textFile("path/to/data/" + day + "/")
  val rawFeatures = data.map(extractFeatures....).toDF(featureHeaders: _*)
  val model = pipeline.fit(rawFeatures)

  val kmeansModel = model.stages(model.stages.size - 1).asInstanceOf[KMeansModel]
  println(kmeansModel.clusterCenters(0).size)
}

For the different partitions, the cluster centers have different dimensions (But the same for every of the 40 clusters within a partition). So I can't calculate the distance between them. I would suspect that they are all of equal size (namely the size of my euclidean space which is 13, because I have 13 features). But it gives my weird numbers which I don't understand.

I saved the extracted feature vectors to a file to check them. Their format is as suspected. Every feature is present.

Any ideas what I'm doing wrong or if I have a misunderstanding? Thank you!


Solution

  • Skipping over the fact that KMeans is not a good choice for processing categorical data your code doesn't guarantee:

    • The same index - feature relationship between batches. StringIndexer assigns labels following frequencies. The most common string is encoded as 0, the least common one as a numLabels - 1.
    • The same number of inidces between batches, and as a result the same shape of one-hot-encoded and assembled vectors. Size of the vector is equal to the number of unique labels adjusted depending on the value of dropLast parameter in OneHotEncoder.

    In consequence encoded vectors may have different dimensions and interpretation from batch to batch.

    If you want consistent encoding you'll need persistent dictionary mapping which ensure consistent indexing between batches.