Search code examples
javahadoopcluster-analysismahoutk-means

IO Exception while running K-means clustering using mahout and hadoop jar


I am trying to run a clustering program using Mahout.Following is my java code which I am using

package com;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class ClusteringDemo {

    public static final double[][] points = { { 1, 1 }, { 2, 1 }, { 1, 2 },
            { 2, 2 }, { 3, 3 }, { 8, 8 }, { 9, 8 }, { 8, 9 }, { 9, 9 } };

    public static void writePointsToFile(List<Vector> points, String fileName,
            FileSystem fs, Configuration conf) throws IOException {
        Path path = new Path(fileName);
        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
                LongWritable.class, VectorWritable.class);
        long recNum = 0;
        VectorWritable vec = new VectorWritable();
        for (Vector point : points) {
            vec.set(point);
            writer.append(new LongWritable(recNum++), vec);
        }
        writer.close();
    }

    public static List<Vector> getPoints(double[][] raw) {
        List<Vector> points = new ArrayList<Vector>();
        for (int i = 0; i < raw.length; i++) {
            double[] fr = raw[i];
            Vector vec = new RandomAccessSparseVector(fr.length);
            vec.assign(fr);
            points.add(vec);
        }
        return points;
    }

    public static void main(String args[]) throws Exception {
        int k = 3;
        List<Vector> vectors = getPoints(points);
        File testData = new File("/home/vishal/testdata");
        if (!testData.exists()) {
            testData.mkdir();
        }
        testData = new File("/home/vishal/testdata/points");
        if (!testData.exists()) {
            testData.mkdir();
        }
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        writePointsToFile(vectors, "/home/vishal/testdata/points/file1", fs,
                conf);

        Path path = new Path("/home/vishal/testdata/clusters/part-00000");
        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
                Text.class, Cluster.class);
        for (int i = 0; i < k; i++) {
            Vector vec = vectors.get(i);
            Cluster cluster = new Cluster(vec, i,
                    new EuclideanDistanceMeasure());
            writer.append(new Text(cluster.getIdentifier()), cluster);

        }
        writer.close();
        KMeansDriver.run(conf, new Path("/home/vishal/testdata/points"),
                new Path("/home/vishal/testdata/clusters"), new Path(
                        "/home/vishal/output"), new EuclideanDistanceMeasure(),
                0.001, 10, true, false);

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
                "/home/vishal/output/" + Cluster.CLUSTERED_POINTS_DIR
                        + "/part-m-00000"), conf);
        IntWritable key = new IntWritable();
        WeightedVectorWritable value = new WeightedVectorWritable();
        while (reader.next(key, value)) {
            System.out.println(value.toString() + " belongs to cluster "
                    + key.toString());
        }
        reader.close();

    }

}

But when i run it ,it starts executing normally but at the end gives me an error.. Following is stack trace which I am getting when I am running it.

13/05/30 09:49:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/05/30 09:49:22 INFO kmeans.KMeansDriver: Input: /home/vishal/testdata/points Clusters In: /home/vishal/testdata/clusters Out: /home/vishal/output Distance: org.apache.mahout.common.distance.EuclideanDistanceMeasure
13/05/30 09:49:22 INFO kmeans.KMeansDriver: convergence: 0.0010 max Iterations: 10 num Reduce Tasks: org.apache.mahout.math.VectorWritable Input Vectors: {}
13/05/30 09:49:22 INFO kmeans.KMeansDriver: K-Means Iteration 1
13/05/30 09:49:22 INFO common.HadoopUtil: Deleting /home/vishal/output/clusters-1
13/05/30 09:49:23 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/05/30 09:49:23 INFO input.FileInputFormat: Total input paths to process : 1
13/05/30 09:49:23 INFO mapred.JobClient: Running job: job_local_0001
13/05/30 09:49:23 INFO util.ProcessTree: setsid exited with exit code 0
13/05/30 09:49:23 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@15fc40c
13/05/30 09:49:23 INFO mapred.MapTask: io.sort.mb = 100
13/05/30 09:49:23 INFO mapred.MapTask: data buffer = 79691776/99614720
13/05/30 09:49:23 INFO mapred.MapTask: record buffer = 262144/327680
13/05/30 09:49:23 INFO mapred.MapTask: Starting flush of map output
13/05/30 09:49:23 INFO mapred.MapTask: Finished spill 0
13/05/30 09:49:23 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/05/30 09:49:24 INFO mapred.JobClient:  map 0% reduce 0%
13/05/30 09:49:26 INFO mapred.LocalJobRunner: 
13/05/30 09:49:26 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/05/30 09:49:26 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@15ed659
13/05/30 09:49:26 INFO mapred.LocalJobRunner: 
13/05/30 09:49:26 INFO mapred.Merger: Merging 1 sorted segments
13/05/30 09:49:26 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 185 bytes
13/05/30 09:49:26 INFO mapred.LocalJobRunner: 
13/05/30 09:49:26 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/05/30 09:49:26 INFO mapred.LocalJobRunner: 
13/05/30 09:49:26 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/05/30 09:49:26 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to /home/vishal/output/clusters-1
13/05/30 09:49:27 INFO mapred.JobClient:  map 100% reduce 0%
13/05/30 09:49:29 INFO mapred.LocalJobRunner: reduce > reduce
13/05/30 09:49:29 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/05/30 09:49:30 INFO mapred.JobClient:  map 100% reduce 100%
13/05/30 09:49:30 INFO mapred.JobClient: Job complete: job_local_0001
13/05/30 09:49:30 INFO mapred.JobClient: Counters: 21
13/05/30 09:49:30 INFO mapred.JobClient:   File Output Format Counters 
13/05/30 09:49:30 INFO mapred.JobClient:     Bytes Written=474
13/05/30 09:49:30 INFO mapred.JobClient:   Clustering
13/05/30 09:49:30 INFO mapred.JobClient:     Converged Clusters=1
13/05/30 09:49:30 INFO mapred.JobClient:   FileSystemCounters
13/05/30 09:49:30 INFO mapred.JobClient:     FILE_BYTES_READ=3328461
13/05/30 09:49:30 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=3422872
13/05/30 09:49:30 INFO mapred.JobClient:   File Input Format Counters 
13/05/30 09:49:30 INFO mapred.JobClient:     Bytes Read=443
13/05/30 09:49:30 INFO mapred.JobClient:   Map-Reduce Framework
13/05/30 09:49:30 INFO mapred.JobClient:     Map output materialized bytes=189
13/05/30 09:49:30 INFO mapred.JobClient:     Map input records=9
13/05/30 09:49:30 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/05/30 09:49:30 INFO mapred.JobClient:     Spilled Records=6
13/05/30 09:49:30 INFO mapred.JobClient:     Map output bytes=531
13/05/30 09:49:30 INFO mapred.JobClient:     Total committed heap usage (bytes)=325713920
13/05/30 09:49:30 INFO mapred.JobClient:     CPU time spent (ms)=0
13/05/30 09:49:30 INFO mapred.JobClient:     SPLIT_RAW_BYTES=104
13/05/30 09:49:30 INFO mapred.JobClient:     Combine input records=9
13/05/30 09:49:30 INFO mapred.JobClient:     Reduce input records=3
13/05/30 09:49:30 INFO mapred.JobClient:     Reduce input groups=3
13/05/30 09:49:30 INFO mapred.JobClient:     Combine output records=3
13/05/30 09:49:30 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/05/30 09:49:30 INFO mapred.JobClient:     Reduce output records=3
13/05/30 09:49:30 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/05/30 09:49:30 INFO mapred.JobClient:     Map output records=9
13/05/30 09:49:30 INFO kmeans.KMeansDriver: K-Means Iteration 2
13/05/30 09:49:30 INFO common.HadoopUtil: Deleting /home/vishal/output/clusters-2
13/05/30 09:49:30 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/05/30 09:49:30 INFO input.FileInputFormat: Total input paths to process : 1
13/05/30 09:49:30 INFO mapred.JobClient: Running job: job_local_0002
13/05/30 09:49:30 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@13f136e
13/05/30 09:49:30 INFO mapred.MapTask: io.sort.mb = 100
13/05/30 09:49:30 INFO mapred.MapTask: data buffer = 79691776/99614720
13/05/30 09:49:30 INFO mapred.MapTask: record buffer = 262144/327680
13/05/30 09:49:30 INFO mapred.MapTask: Starting flush of map output
13/05/30 09:49:30 INFO mapred.MapTask: Finished spill 0
13/05/30 09:49:30 INFO mapred.Task: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
13/05/30 09:49:31 INFO mapred.JobClient:  map 0% reduce 0%
13/05/30 09:49:33 INFO mapred.LocalJobRunner: 
13/05/30 09:49:33 INFO mapred.Task: Task 'attempt_local_0002_m_000000_0' done.
13/05/30 09:49:33 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@d6b059
13/05/30 09:49:33 INFO mapred.LocalJobRunner: 
13/05/30 09:49:33 INFO mapred.Merger: Merging 1 sorted segments
13/05/30 09:49:33 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 124 bytes
13/05/30 09:49:33 INFO mapred.LocalJobRunner: 
13/05/30 09:49:33 INFO mapred.Task: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
13/05/30 09:49:33 INFO mapred.LocalJobRunner: 
13/05/30 09:49:33 INFO mapred.Task: Task attempt_local_0002_r_000000_0 is allowed to commit now
13/05/30 09:49:33 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0002_r_000000_0' to /home/vishal/output/clusters-2
13/05/30 09:49:34 INFO mapred.JobClient:  map 100% reduce 0%
13/05/30 09:49:36 INFO mapred.LocalJobRunner: reduce > reduce
13/05/30 09:49:36 INFO mapred.Task: Task 'attempt_local_0002_r_000000_0' done.
13/05/30 09:49:37 INFO mapred.JobClient:  map 100% reduce 100%
13/05/30 09:49:37 INFO mapred.JobClient: Job complete: job_local_0002
13/05/30 09:49:37 INFO mapred.JobClient: Counters: 20
13/05/30 09:49:37 INFO mapred.JobClient:   File Output Format Counters 
13/05/30 09:49:37 INFO mapred.JobClient:     Bytes Written=364
13/05/30 09:49:37 INFO mapred.JobClient:   FileSystemCounters
13/05/30 09:49:37 INFO mapred.JobClient:     FILE_BYTES_READ=6658544
13/05/30 09:49:37 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=6844248
13/05/30 09:49:37 INFO mapred.JobClient:   File Input Format Counters 
13/05/30 09:49:37 INFO mapred.JobClient:     Bytes Read=443
13/05/30 09:49:37 INFO mapred.JobClient:   Map-Reduce Framework
13/05/30 09:49:37 INFO mapred.JobClient:     Map output materialized bytes=128
13/05/30 09:49:37 INFO mapred.JobClient:     Map input records=9
13/05/30 09:49:37 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/05/30 09:49:37 INFO mapred.JobClient:     Spilled Records=4
13/05/30 09:49:37 INFO mapred.JobClient:     Map output bytes=531
13/05/30 09:49:37 INFO mapred.JobClient:     Total committed heap usage (bytes)=525074432
13/05/30 09:49:37 INFO mapred.JobClient:     CPU time spent (ms)=0
13/05/30 09:49:37 INFO mapred.JobClient:     SPLIT_RAW_BYTES=104
13/05/30 09:49:37 INFO mapred.JobClient:     Combine input records=9
13/05/30 09:49:37 INFO mapred.JobClient:     Reduce input records=2
13/05/30 09:49:37 INFO mapred.JobClient:     Reduce input groups=2
13/05/30 09:49:37 INFO mapred.JobClient:     Combine output records=2
13/05/30 09:49:37 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/05/30 09:49:37 INFO mapred.JobClient:     Reduce output records=2
13/05/30 09:49:37 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/05/30 09:49:37 INFO mapred.JobClient:     Map output records=9
13/05/30 09:49:37 INFO kmeans.KMeansDriver: K-Means Iteration 3
13/05/30 09:49:37 INFO common.HadoopUtil: Deleting /home/vishal/output/clusters-3
13/05/30 09:49:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/05/30 09:49:37 INFO input.FileInputFormat: Total input paths to process : 1
13/05/30 09:49:37 INFO mapred.JobClient: Running job: job_local_0003
13/05/30 09:49:37 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@988707
13/05/30 09:49:37 INFO mapred.MapTask: io.sort.mb = 100
13/05/30 09:49:37 INFO mapred.MapTask: data buffer = 79691776/99614720
13/05/30 09:49:37 INFO mapred.MapTask: record buffer = 262144/327680
13/05/30 09:49:37 INFO mapred.MapTask: Starting flush of map output
13/05/30 09:49:37 INFO mapred.MapTask: Finished spill 0
13/05/30 09:49:37 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/05/30 09:49:38 INFO mapred.JobClient:  map 0% reduce 0%
13/05/30 09:49:40 INFO mapred.LocalJobRunner: 
13/05/30 09:49:40 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
13/05/30 09:49:40 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6214f5
13/05/30 09:49:40 INFO mapred.LocalJobRunner: 
13/05/30 09:49:40 INFO mapred.Merger: Merging 1 sorted segments
13/05/30 09:49:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 124 bytes
13/05/30 09:49:40 INFO mapred.LocalJobRunner: 
13/05/30 09:49:40 INFO mapred.Task: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
13/05/30 09:49:40 INFO mapred.LocalJobRunner: 
13/05/30 09:49:40 INFO mapred.Task: Task attempt_local_0003_r_000000_0 is allowed to commit now
13/05/30 09:49:40 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0003_r_000000_0' to /home/vishal/output/clusters-3
13/05/30 09:49:41 INFO mapred.JobClient:  map 100% reduce 0%
13/05/30 09:49:43 INFO mapred.LocalJobRunner: reduce > reduce
13/05/30 09:49:43 INFO mapred.Task: Task 'attempt_local_0003_r_000000_0' done.
13/05/30 09:49:44 INFO mapred.JobClient:  map 100% reduce 100%
13/05/30 09:49:44 INFO mapred.JobClient: Job complete: job_local_0003
13/05/30 09:49:44 INFO mapred.JobClient: Counters: 21
13/05/30 09:49:44 INFO mapred.JobClient:   File Output Format Counters 
13/05/30 09:49:44 INFO mapred.JobClient:     Bytes Written=364
13/05/30 09:49:44 INFO mapred.JobClient:   Clustering
13/05/30 09:49:44 INFO mapred.JobClient:     Converged Clusters=2
13/05/30 09:49:44 INFO mapred.JobClient:   FileSystemCounters
13/05/30 09:49:44 INFO mapred.JobClient:     FILE_BYTES_READ=9988052
13/05/30 09:49:44 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=10265506
13/05/30 09:49:44 INFO mapred.JobClient:   File Input Format Counters 
13/05/30 09:49:44 INFO mapred.JobClient:     Bytes Read=443
13/05/30 09:49:44 INFO mapred.JobClient:   Map-Reduce Framework
13/05/30 09:49:44 INFO mapred.JobClient:     Map output materialized bytes=128
13/05/30 09:49:44 INFO mapred.JobClient:     Map input records=9
13/05/30 09:49:44 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/05/30 09:49:44 INFO mapred.JobClient:     Spilled Records=4
13/05/30 09:49:44 INFO mapred.JobClient:     Map output bytes=531
13/05/30 09:49:44 INFO mapred.JobClient:     Total committed heap usage (bytes)=724434944
13/05/30 09:49:44 INFO mapred.JobClient:     CPU time spent (ms)=0
13/05/30 09:49:44 INFO mapred.JobClient:     SPLIT_RAW_BYTES=104
13/05/30 09:49:44 INFO mapred.JobClient:     Combine input records=9
13/05/30 09:49:44 INFO mapred.JobClient:     Reduce input records=2
13/05/30 09:49:44 INFO mapred.JobClient:     Reduce input groups=2
13/05/30 09:49:44 INFO mapred.JobClient:     Combine output records=2
13/05/30 09:49:44 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/05/30 09:49:44 INFO mapred.JobClient:     Reduce output records=2
13/05/30 09:49:44 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/05/30 09:49:44 INFO mapred.JobClient:     Map output records=9
Exception in thread "main" java.io.IOException: Target /home/vishal/output/clusters-3-final/clusters-3 is a directory
    at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:359)
    at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:361)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:211)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:163)
    at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:287)
    at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:425)
    at org.apache.mahout.clustering.kmeans.KMeansDriver.buildClustersMR(KMeansDriver.java:322)
    at org.apache.mahout.clustering.kmeans.KMeansDriver.buildClusters(KMeansDriver.java:239)
    at org.apache.mahout.clustering.kmeans.KMeansDriver.run(KMeansDriver.java:154)
    at com.ClusteringDemo.main(ClusteringDemo.java:80)

What can be the reason ??

Thanks


Solution

  • Here is what KMeansDriver is trying to do:

    Path finalClustersIn = new Path(output, AbstractCluster.CLUSTERS_DIR + (iteration-1) + "-final");
    FileSystem.get(conf).rename(new Path(output, AbstractCluster.CLUSTERS_DIR + (iteration-1)), finalClustersIn);
    

    As you can see, it has converged after 3 iterations and is trying to merge the result of the 3rd iteration in directory clusters-3 into clusters-3-final to show it is finished.

    Now the rename method of FileSystem does a check before actually renaming to ensure it's not trying to rename to a directory that already exists. And as a matter of fact it looks like you already have this directory clusters-3-final, probably from a previous run.

    Removing this directory should fix your issue, you can do it via command-line with:

    hadoop fs -rmr /home/vishal/output/clusters-3-final
    

    Or since it looks like you're running your job in local mode:

    rm -rf /home/vishal/output/clusters-3-final
    

    To avoid this kind of issues, I would recommend using a unique output directory everytime you run your analysis, you could for example take the current date and append it to the filename of your output Path, for example using System.currentTimeMillis().

    EDIT: For your second issue about:

    Exception in thread "main" java.io.IOException: wrong value class: 0.0: null is not class org.apache.mahout.clustering.WeightedPropertyVectorWritable at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1932) at com.ClusteringDemo.main(ClusteringDemo.java:90)
    

    you are actually suffering from a conflict between Mahout versions because older Mahout versions used WeightedVectorWritable while more recent ones use WeightedPropertyVectorWritable. To fix it, simply change the declaration of your value variable from:

    WeightedVectorWritable value = new WeightedVectorWritable();
    

    to:

    WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();