Search code examples
hadoopdistributed-cache

Files not put correctly into distributed cache


I am adding a file to distributed cache using the following code:

Configuration conf2 = new Configuration();      
job = new Job(conf2);
job.setJobName("Join with Cache");
DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2);

Then I read the file into the mappers:

protected void setup(Context context)throws IOException,InterruptedException{
Configuration conf = context.getConfiguration();

URI[] cacheFile = DistributedCache.getCacheFiles(conf);
FSDataInputStream in = FileSystem.get(conf).open(new Path(cacheFile[0].getPath()));
BufferedReader joinReader = new BufferedReader(new InputStreamReader(in));

String line;
        try {
              while ((line = joinReader.readLine()) != null) {
              s = line.toString().split("\t");
                  do stuff to s
                } finally {
                   joinReader.close();
                }

The problem is that I only read in one line, and it is not the file I was putting into the cache. Rather it is: cm9vdA==, or root in base64.

Has anyone else had this problem, or see how I'm using distributed cache incorrectly? I am using Hadoop 0.20.2 fully distributed.


Solution

  • Common mistake in your job configuration:

    Configuration conf2 = new Configuration();      
    job = new Job(conf2);
    job.setJobName("Join with Cache");
    DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2);
    

    After you create your Job object, you need to pull back the Configuration object as Job makes a copy of it, and configuring values in conf2 after you create the job will have no effect on the job iteself. Try this:

    job = new Job(new Configuration());
    Configuration conf2 = job.getConfiguration();
    job.setJobName("Join with Cache");
    DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2);
    

    You should also check the number of files in the distributed cache, there is probably more than one and you're opening a random file which is giving you the value you are seeing.

    I suggest you use symlinking which will make the files available in the local working directory, and with a known name:

    DistributedCache.createSymlink(conf2);
    DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000#myfile"), conf2);
    
    // then in your mapper setup:
    BufferedReader joinReader = new BufferedReader(new FileInputStream("myfile"));