Search code examples
javamapreducehamming-distance

using hamming distance algorithm to create index for fuzzy join with help of map reduce


I am New to Java and Map Reduce, and I am trying to write a Map Reduce program that reads the list words called "dictionary" in program and use hamming distance algorithm to generate all the words in the list having distance 1. I am able to generate the output but the problem is it seems very inefficient as I need to load entire list into a ArrayList and for each word I am invoking Hamming distance in Map method so I am reading entire list twice and running hamming distance algorithm n*n times, where n is the number of word in the list.

Could please suggest me some efficient way do it.

here is the code. and there is no reducer as of now in it.

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

public class MapJoin {


    public static class MyMapper extends Mapper<LongWritable,Text, Text, Text> {


        private List<String> Lst = new ArrayList<String>();


        protected void setup(Context context) throws java.io.IOException, InterruptedException{
            Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());


            for (Path p : files) {
                if (p.getName().equals("dictionary.txt")) {
                    BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
                    String line = reader.readLine();
                    while(line != null) {
                        String tokens = line.toString() ;

                        Lst.add(tokens);
                        line = reader.readLine();
                    }
                    reader.close();
                }
            }
            if (Lst.isEmpty()) {
                throw new IOException("Unable to load Abbrevation data.");
            }
        }


        public void map(LongWritable key, Text val, Context con)
                throws IOException,InterruptedException {

              String line1 = val.toString();
              StringTokenizer itr = new StringTokenizer(line1.toLowerCase()) ;
              while (itr.hasMoreTokens()) {

                  String key1 = itr.nextToken() ;  
                  String fnlstr = HammingDist(key1) ;

                      con.write(new Text(key1), new Text(fnlstr));

              }
            }


        private String HammingDist(String ky)
          {
              String result = "" ;
              for(String x :Lst)
              {
                  char[] s1 = ky.toCharArray();
                    char[] s2 = x.toCharArray();

                    int shorter = Math.min(s1.length, s2.length);
                    int longer = Math.max(s1.length, s2.length);

                    int distance = 0;
                    for (int i=0; i<shorter; i++) {
                        if (s1[i] != s2[i]) distance++;
                    }

                    distance += longer - shorter;

                    if (distance <2)
                    {
                        result = result +","+x ;
                    }
              }
              if(result == null) 
                  {
                  return "" ;
                  }
              else
              return result ;
          }
    }

  public static void main(String[] args) 
                  throws IOException, ClassNotFoundException, InterruptedException {

    Job job = new Job();
    job.setJarByClass(MapJoin.class);
    job.setJobName("MapJoin");
    job.setNumReduceTasks(0);

    try{
    DistributedCache.addCacheFile(new URI("/Input/dictionary.txt"), job.getConfiguration());
    }catch(Exception e){
        System.out.println(e);
    }

    job.setMapperClass(MyMapper.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);


  }
}

Solution

  • For every token you find in map, you call HammingDist, which will iterate over List<String> Lst and convert each item to char[]. It would be better to replace List<String> Lst with List<char[]>, and add the elements converted in the first place, instead of converting the same words over and over again.