I'd like perform a kmeans clustering on some data that we have in Accumulo. My first thought was to use the kmeans clustering in Apache Mahout, but I'm having a difficult time connecting the two without using temporary files. As near as I can tell, in order to use Mahout, I would need to write the Accumulo data into a series of vector files stored in HDFS, then use Mahout to cluster them, then write the results back into Accumulo (the Mahout entry points all seem to take paths to directories). Although I haven't tried it yet, that just seems like a performance nightmare. Is there a better way? Alternatively, are there other kmeans clustering libraries available that would have an easier time connecting to Accumulo? I'm looking into opencv now, but other suggestions are welcome.
As @FuriousGeorge suggested, I looked into Apache Spark. This does indeed provide a way to perform kmeans clustering without using temporary files, as so:
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import scala.Tuple2;
public class ClusterAccumuloData
{
public static void main(String[] args)
{
JavaSparkContext sc = new JavaSparkContext("yarn-cluster",
"JobName",
"/spark/installation/directory",
"/path/to/jar/file/containing/this/class");
Configuration conf = new Configuration(); // As near as I can tell, this is all we need.
Authorizations auths = new Authorizations("whatever_you_need");
AccumuloInputFormat.setInputInfo(conf,
"accumulo_user",
"users_password".getBytes(),
"accumulo_table_name",
auths);
AccumuloInputFormat.setZooKeeperInstance(conf,
"accumulo_instance_name",
"zookeeper_server_1,zookeeper_server_2");
// Calls to other AccumuloInputFormat functions (such as setRanges or addIterator)
// that configure it to retrieve the data you wish to cluster.
JavaPairRDD<Key, Value> accumuloRDD = sc.newAPIHadoopRDD(conf,
AccumuloInputFormat.class,
Key.class,
Value.class);
JavaRDD<Vector> kmeansDataRDD =
accumuloRDD.map(new Function<Tuple2<Key, Value>, Vector>()
{
public Vector call(Tuple2<Key, Value> accumuloData)
{
// Code which transforms accumuloData into either a
// DenseVector or a SparseVector, then returns that Vector.
}
});
KMeansModel kmm = KMeans.train(JavaRDD.toRDD(kmeansDataRDD), 42, 14, 37);
}
}