Search code examples
javahadoopcascadingbigdata

Cascading wordcount java task throws nullPointException


I am learning Cascading. I have gradle project from tutorial(https://github.com/Cascading/tutorials.git). Exists one WordCount class:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;    
import cascading.flow.hadoop.*;
import cascading.property.AppProps;

public class WordCount{

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable( 1 );
    private Text word = new Text();

    public void map( LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter ) throws IOException{
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer( line );
      while( tokenizer.hasMoreTokens() )
        {
        word.set( tokenizer.nextToken() );
        output.collect( word, one );
        }
      }
    }

  public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter ) throws IOException{
      int sum = 0;
      while( values.hasNext() )
        {
        sum += values.next().get();
        }
      output.collect( key, new IntWritable( sum ) );
      }
    }

  public static void main( String[] args ) throws Exception{

    Properties properties = new Properties();
    AppProps.addApplicationTag( properties, "tutorials" );
    AppProps.addApplicationTag( properties, "cluster:development" );
    AppProps.setApplicationName( properties, "cascading-mapreduce-flow" );

    JobConf conf = new JobConf( WordCount.class );
    conf.setJobName( "casading-mapreduce-flow" );

    conf.setOutputKeyClass( Text.class );
    conf.setOutputValueClass( IntWritable.class );

    conf.setMapperClass( Map.class );
    conf.setCombinerClass( Reduce.class );
    conf.setReducerClass( Reduce.class );

    conf.setInputFormat( TextInputFormat.class );
    conf.setOutputFormat( TextOutputFormat.class );

    FileInputFormat.setInputPaths( conf, new Path( args[ 0 ] ) );
    FileOutputFormat.setOutputPath( conf, new Path( args[ 1 ] ) );

    MapReduceFlow flow = new MapReduceFlow( "wordcount", conf, true );

    // JobClient.runJob(conf);
    flow.complete();    
    }
  }

and gradle file:

 apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'provided-base'

ext.cascadingVersion = '3.0.0'
ext.hadoop2Version = '2.6.0'

buildscript {
  repositories {
    maven { url 'http://repo.spring.io/plugins-release' }
  }
  dependencies {
    classpath 'com.netflix.nebula:gradle-extra-configurations-plugin:2.2.1'
  }
}

dependencies {
  compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-local', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-hadoop2-mr1', version: cascadingVersion )

  provided( group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoop2Version )
  provided( group: 'org.apache.hadoop', name: 'hadoop-client', version: hadoop2Version )
  provided( group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: hadoop2Version )
}

jar {
  description = "Assembles a Hadoop ready jar file"
  doFirst {
    into( 'lib' ) {
      from { configurations.compile.minus( [configurations.provided] ) }
    }
  }

  manifest {
    attributes( "Main-Class": "mr/WordCount" )
  }
}

Having created jar file via command: gradle clean jar, I run created jar via command: hadoop jar ./build/libs/cascading-mr.jar /wc_input /wc_output I got this exception:

INFO property.AppProps: using app.id: 8ED078B09B0847F692A77ABBE15601F6
Exception in thread "main" java.lang.NullPointerException
    at cascading.flow.planner.BaseFlowStep.configure(BaseFlowStep.java:143)
    at cascading.flow.planner.BaseFlowStep.<init>(BaseFlowStep.java:137)
    at cascading.flow.hadoop.HadoopFlowStep.<init>(HadoopFlowStep.java:84)
    at cascading.flow.hadoop.MapReduceFlowStep.<init>(MapReduceFlowStep.java:35)
    at cascading.flow.hadoop.MapReduceFlow.makeStepGraph(MapReduceFlow.java:144)
    at cascading.flow.hadoop.MapReduceFlow.<init>(MapReduceFlow.java:133)
    at cascading.flow.hadoop.MapReduceFlow.<init>(MapReduceFlow.java:112)
    at mr.WordCount.main(WordCount.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

I have just started to learn cascading and not good with it, so It will be helpfull any suggestions.


Solution

  • I think you have not define the main class in command

    hadoop jar ./build/libs/cascading-mr.jar WordCount /wc_input /wc_output
    

    if main class is in package

    hadoop jar ./build/libs/cascading-mr.jar packageName.className /wc_input /wc_output