Search code examples
javahadoopavro

Type mismatch in key from map


I'm getting this error:

java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

from this code:

public class MyAvroApplication {
    private static class MyReducer extends Reducer<Email, Text, Text, Text> {
        private Text from = new Text();
        private Text subject = new Text();

        public void reduce(Email key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            from.set(key.getFrom().toString());
            subject.set(key.getSubject().toString());
            context.write(from, subject);
        }
    }

    private static class MyAvroMapper extends Mapper<Text, Email, Email, Text> {
        protected void map(Email email, NullWritable value, Context context) throws IOException, InterruptedException {
            context.write(email,
                    new Text(email.getSubject().toString()));
        }
    }

    public static void main(String[] args) throws Exception {
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");

        ugi.doAs(new PrivilegedExceptionAction<Void>() {
            public Void run() throws Exception {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "hdfs://127.0.0.1:8020/user/rich");
                conf.set("hadoop.job.ugi", "hdfs");

                Job job = Job.getInstance(conf, "mytest");

                AvroJob.setInputKeySchema(job, Email.getClassSchema());
                AvroJob.setOutputKeySchema(job, Email.getClassSchema());

                job.setJarByClass(MyApplication.class);
                job.setMapperClass(MyAvroMapper.class);
                job.setCombinerClass(MyReducer.class);
                job.setReducerClass(MyReducer.class);

//              job.setInputFormatClass(AvroKeyInputFormat.class);
//              job.setOutputFormatClass(AvroKeyOutputFormat.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                FileInputFormat.addInputPath(job, new Path("/user/rich/in/"));
                FileOutputFormat.setOutputPath(job, new Path("/user/rich/out/"));

                boolean result = job.waitForCompletion(true);

                System.out.println(result);

                return null;
            }
        });
    }
}

I'm really struggling to understand how to match up the various key-value parameters on the way in and out everywhere. My intention is to read some Avro format files in as an Email and then run some simple calculations, just counting the number of emails would be a good start.

Edit:

A similar error occurs with this Mapper:

private static class MyAvroMapper extends Mapper<Text, AvroKey<Email>, AvroKey<Email>, Text> {
    protected void map(Text key, AvroKey<Email> email, Context context) throws IOException, InterruptedException {
        context.write(email,
                new Text(email.datum().getSubject().toString()));
    }
}

And the error:

java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
    at com.test.myapp.MyAvroApplication$MyAvroMapper.map(MyAvroApplication.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The Email class generated from the schema:

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Email extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"com.test.myapp.avro\",\"fields\":[{\"name\":\"message_id\",\"type\":[\"null\",\"string\"],\"doc\":\"\"},{\"name\":\"date\",\"type\":[\"long\",\"null\"]},{\"name\":\"from\",\"type\":{\"type\":\"record\",\"name\":\"EmailAddress\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"\"},{\"name\":\"address\",\"type\":[\"null\",\"string\"],\"doc\":\"\"}]}},{\"name\":\"subject\",\"type\":[\"string\",\"null\"]},{\"name\":\"body\",\"type\":[\"string\",\"null\"]},{\"name\":\"tos\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"EmailAddress\"]}],\"doc\":\"\"},{\"name\":\"ccs\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"EmailAddress\"]}],\"doc\":\"\"},{\"name\":\"bccs\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"EmailAddress\"]}],\"doc\":\"\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  /**  */
  @Deprecated public java.lang.CharSequence message_id;
  @Deprecated public java.lang.Long date;
  @Deprecated public com.test.myapp.avro.EmailAddress from;
  @Deprecated public java.lang.CharSequence subject;
  @Deprecated public java.lang.CharSequence body;
  /**  */
  @Deprecated public java.util.List<com.test.myapp.avro.EmailAddress> tos;
  /**  */
  @Deprecated public java.util.List<com.test.myapp.avro.EmailAddress> ccs;
  /**  */
  @Deprecated public java.util.List<com.test.myapp.avro.EmailAddress> bccs;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>. 
   */
  public Email() {}

  /**
   * All-args constructor.
   */
  public Email(java.lang.CharSequence message_id, java.lang.Long date, com.test.myapp.avro.EmailAddress from, java.lang.CharSequence subject, java.lang.CharSequence body, java.util.List<com.test.myapp.avro.EmailAddress> tos, java.util.List<com.test.myapp.avro.EmailAddress> ccs, java.util.List<com.test.myapp.avro.EmailAddress> bccs) {
    this.message_id = message_id;
    this.date = date;
    this.from = from;
    this.subject = subject;
    this.body = body;
    this.tos = tos;
    this.ccs = ccs;
    this.bccs = bccs;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call. 
  public java.lang.Object get(int field$) {
    switch (field$) {
    case 0: return message_id;
    case 1: return date;
    case 2: return from;
    case 3: return subject;
    case 4: return body;
    case 5: return tos;
    case 6: return ccs;
    case 7: return bccs;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
  // Used by DatumReader.  Applications should not call. 
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    case 0: message_id = (java.lang.CharSequence)value$; break;
    case 1: date = (java.lang.Long)value$; break;
    case 2: from = (com.test.myapp.avro.EmailAddress)value$; break;
    case 3: subject = (java.lang.CharSequence)value$; break;
    case 4: body = (java.lang.CharSequence)value$; break;
    case 5: tos = (java.util.List<com.test.myapp.avro.EmailAddress>)value$; break;
    case 6: ccs = (java.util.List<com.test.myapp.avro.EmailAddress>)value$; break;
    case 7: bccs = (java.util.List<com.test.myapp.avro.EmailAddress>)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'message_id' field.
   *    */
  public java.lang.CharSequence getMessageId() {
    return message_id;
  }

  /**
   * Sets the value of the 'message_id' field.
   *    * @param value the value to set.
   */
  public void setMessageId(java.lang.CharSequence value) {
    this.message_id = value;
  }

  /**
   * Gets the value of the 'date' field.
   */
  public java.lang.Long getDate() {
    return date;
  }

  /**
   * Sets the value of the 'date' field.
   * @param value the value to set.
   */
  public void setDate(java.lang.Long value) {
    this.date = value;
  }

  /**
   * Gets the value of the 'from' field.
   */
  public com.test.myapp.avro.EmailAddress getFrom() {
    return from;
  }

  /**
   * Sets the value of the 'from' field.
   * @param value the value to set.
   */
  public void setFrom(com.test.myapp.avro.EmailAddress value) {
    this.from = value;
  }

  /**
   * Gets the value of the 'subject' field.
   */
  public java.lang.CharSequence getSubject() {
    return subject;
  }

  /**
   * Sets the value of the 'subject' field.
   * @param value the value to set.
   */
  public void setSubject(java.lang.CharSequence value) {
    this.subject = value;
  }

  /**
   * Gets the value of the 'body' field.
   */
  public java.lang.CharSequence getBody() {
    return body;
  }

  /**
   * Sets the value of the 'body' field.
   * @param value the value to set.
   */
  public void setBody(java.lang.CharSequence value) {
    this.body = value;
  }

  /**
   * Gets the value of the 'tos' field.
   *    */
  public java.util.List<com.test.myapp.avro.EmailAddress> getTos() {
    return tos;
  }

  /**
   * Sets the value of the 'tos' field.
   *    * @param value the value to set.
   */
  public void setTos(java.util.List<com.test.myapp.avro.EmailAddress> value) {
    this.tos = value;
  }

  /**
   * Gets the value of the 'ccs' field.
   *    */
  public java.util.List<com.test.myapp.avro.EmailAddress> getCcs() {
    return ccs;
  }

  /**
   * Sets the value of the 'ccs' field.
   *    * @param value the value to set.
   */
  public void setCcs(java.util.List<com.test.myapp.avro.EmailAddress> value) {
    this.ccs = value;
  }

  /**
   * Gets the value of the 'bccs' field.
   *    */
  public java.util.List<com.test.myapp.avro.EmailAddress> getBccs() {
    return bccs;
  }

  /**
   * Sets the value of the 'bccs' field.
   *    * @param value the value to set.
   */
  public void setBccs(java.util.List<com.test.myapp.avro.EmailAddress> value) {
    this.bccs = value;
  }

  /** Creates a new Email RecordBuilder */
  public static com.test.myapp.avro.Email.Builder newBuilder() {
    return new com.test.myapp.avro.Email.Builder();
  }

  /** Creates a new Email RecordBuilder by copying an existing Builder */
  public static com.test.myapp.avro.Email.Builder newBuilder(com.test.myapp.avro.Email.Builder other) {
    return new com.test.myapp.avro.Email.Builder(other);
  }

  /** Creates a new Email RecordBuilder by copying an existing Email instance */
  public static com.test.myapp.avro.Email.Builder newBuilder(com.test.myapp.avro.Email other) {
    return new com.test.myapp.avro.Email.Builder(other);
  }

  /**
   * RecordBuilder for Email instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Email>
    implements org.apache.avro.data.RecordBuilder<Email> {

    private java.lang.CharSequence message_id;
    private java.lang.Long date;
    private com.test.myapp.avro.EmailAddress from;
    private java.lang.CharSequence subject;
    private java.lang.CharSequence body;
    private java.util.List<com.test.myapp.avro.EmailAddress> tos;
    private java.util.List<com.test.myapp.avro.EmailAddress> ccs;
    private java.util.List<com.test.myapp.avro.EmailAddress> bccs;

    /** Creates a new Builder */
    private Builder() {
      super(com.test.myapp.avro.Email.SCHEMA$);
    }

    /** Creates a Builder by copying an existing Builder */
    private Builder(com.test.myapp.avro.Email.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.message_id)) {
        this.message_id = data().deepCopy(fields()[0].schema(), other.message_id);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.date)) {
        this.date = data().deepCopy(fields()[1].schema(), other.date);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.from)) {
        this.from = data().deepCopy(fields()[2].schema(), other.from);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.subject)) {
        this.subject = data().deepCopy(fields()[3].schema(), other.subject);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.body)) {
        this.body = data().deepCopy(fields()[4].schema(), other.body);
        fieldSetFlags()[4] = true;
      }
      if (isValidValue(fields()[5], other.tos)) {
        this.tos = data().deepCopy(fields()[5].schema(), other.tos);
        fieldSetFlags()[5] = true;
      }
      if (isValidValue(fields()[6], other.ccs)) {
        this.ccs = data().deepCopy(fields()[6].schema(), other.ccs);
        fieldSetFlags()[6] = true;
      }
      if (isValidValue(fields()[7], other.bccs)) {
        this.bccs = data().deepCopy(fields()[7].schema(), other.bccs);
        fieldSetFlags()[7] = true;
      }
    }

    /** Creates a Builder by copying an existing Email instance */
    private Builder(com.test.myapp.avro.Email other) {
            super(com.test.myapp.avro.Email.SCHEMA$);
      if (isValidValue(fields()[0], other.message_id)) {
        this.message_id = data().deepCopy(fields()[0].schema(), other.message_id);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.date)) {
        this.date = data().deepCopy(fields()[1].schema(), other.date);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.from)) {
        this.from = data().deepCopy(fields()[2].schema(), other.from);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.subject)) {
        this.subject = data().deepCopy(fields()[3].schema(), other.subject);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.body)) {
        this.body = data().deepCopy(fields()[4].schema(), other.body);
        fieldSetFlags()[4] = true;
      }
      if (isValidValue(fields()[5], other.tos)) {
        this.tos = data().deepCopy(fields()[5].schema(), other.tos);
        fieldSetFlags()[5] = true;
      }
      if (isValidValue(fields()[6], other.ccs)) {
        this.ccs = data().deepCopy(fields()[6].schema(), other.ccs);
        fieldSetFlags()[6] = true;
      }
      if (isValidValue(fields()[7], other.bccs)) {
        this.bccs = data().deepCopy(fields()[7].schema(), other.bccs);
        fieldSetFlags()[7] = true;
      }
    }

    /** Gets the value of the 'message_id' field */
    public java.lang.CharSequence getMessageId() {
      return message_id;
    }

    /** Sets the value of the 'message_id' field */
    public com.test.myapp.avro.Email.Builder setMessageId(java.lang.CharSequence value) {
      validate(fields()[0], value);
      this.message_id = value;
      fieldSetFlags()[0] = true;
      return this; 
    }

    /** Checks whether the 'message_id' field has been set */
    public boolean hasMessageId() {
      return fieldSetFlags()[0];
    }

    /** Clears the value of the 'message_id' field */
    public com.test.myapp.avro.Email.Builder clearMessageId() {
      message_id = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /** Gets the value of the 'date' field */
    public java.lang.Long getDate() {
      return date;
    }

    /** Sets the value of the 'date' field */
    public com.test.myapp.avro.Email.Builder setDate(java.lang.Long value) {
      validate(fields()[1], value);
      this.date = value;
      fieldSetFlags()[1] = true;
      return this; 
    }

    /** Checks whether the 'date' field has been set */
    public boolean hasDate() {
      return fieldSetFlags()[1];
    }

    /** Clears the value of the 'date' field */
    public com.test.myapp.avro.Email.Builder clearDate() {
      date = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /** Gets the value of the 'from' field */
    public com.test.myapp.avro.EmailAddress getFrom() {
      return from;
    }

    /** Sets the value of the 'from' field */
    public com.test.myapp.avro.Email.Builder setFrom(com.test.myapp.avro.EmailAddress value) {
      validate(fields()[2], value);
      this.from = value;
      fieldSetFlags()[2] = true;
      return this; 
    }

    /** Checks whether the 'from' field has been set */
    public boolean hasFrom() {
      return fieldSetFlags()[2];
    }

    /** Clears the value of the 'from' field */
    public com.test.myapp.avro.Email.Builder clearFrom() {
      from = null;
      fieldSetFlags()[2] = false;
      return this;
    }

    /** Gets the value of the 'subject' field */
    public java.lang.CharSequence getSubject() {
      return subject;
    }

    /** Sets the value of the 'subject' field */
    public com.test.myapp.avro.Email.Builder setSubject(java.lang.CharSequence value) {
      validate(fields()[3], value);
      this.subject = value;
      fieldSetFlags()[3] = true;
      return this; 
    }

    /** Checks whether the 'subject' field has been set */
    public boolean hasSubject() {
      return fieldSetFlags()[3];
    }

    /** Clears the value of the 'subject' field */
    public com.test.myapp.avro.Email.Builder clearSubject() {
      subject = null;
      fieldSetFlags()[3] = false;
      return this;
    }

    /** Gets the value of the 'body' field */
    public java.lang.CharSequence getBody() {
      return body;
    }

    /** Sets the value of the 'body' field */
    public com.test.myapp.avro.Email.Builder setBody(java.lang.CharSequence value) {
      validate(fields()[4], value);
      this.body = value;
      fieldSetFlags()[4] = true;
      return this; 
    }

    /** Checks whether the 'body' field has been set */
    public boolean hasBody() {
      return fieldSetFlags()[4];
    }

    /** Clears the value of the 'body' field */
    public com.test.myapp.avro.Email.Builder clearBody() {
      body = null;
      fieldSetFlags()[4] = false;
      return this;
    }

    /** Gets the value of the 'tos' field */
    public java.util.List<com.test.myapp.avro.EmailAddress> getTos() {
      return tos;
    }

    /** Sets the value of the 'tos' field */
    public com.test.myapp.avro.Email.Builder setTos(java.util.List<com.test.myapp.avro.EmailAddress> value) {
      validate(fields()[5], value);
      this.tos = value;
      fieldSetFlags()[5] = true;
      return this; 
    }

    /** Checks whether the 'tos' field has been set */
    public boolean hasTos() {
      return fieldSetFlags()[5];
    }

    /** Clears the value of the 'tos' field */
    public com.test.myapp.avro.Email.Builder clearTos() {
      tos = null;
      fieldSetFlags()[5] = false;
      return this;
    }

    /** Gets the value of the 'ccs' field */
    public java.util.List<com.test.myapp.avro.EmailAddress> getCcs() {
      return ccs;
    }

    /** Sets the value of the 'ccs' field */
    public com.test.myapp.avro.Email.Builder setCcs(java.util.List<com.test.myapp.avro.EmailAddress> value) {
      validate(fields()[6], value);
      this.ccs = value;
      fieldSetFlags()[6] = true;
      return this; 
    }

    /** Checks whether the 'ccs' field has been set */
    public boolean hasCcs() {
      return fieldSetFlags()[6];
    }

    /** Clears the value of the 'ccs' field */
    public com.test.myapp.avro.Email.Builder clearCcs() {
      ccs = null;
      fieldSetFlags()[6] = false;
      return this;
    }

    /** Gets the value of the 'bccs' field */
    public java.util.List<com.test.myapp.avro.EmailAddress> getBccs() {
      return bccs;
    }

    /** Sets the value of the 'bccs' field */
    public com.test.myapp.avro.Email.Builder setBccs(java.util.List<com.test.myapp.avro.EmailAddress> value) {
      validate(fields()[7], value);
      this.bccs = value;
      fieldSetFlags()[7] = true;
      return this; 
    }

    /** Checks whether the 'bccs' field has been set */
    public boolean hasBccs() {
      return fieldSetFlags()[7];
    }

    /** Clears the value of the 'bccs' field */
    public com.test.myapp.avro.Email.Builder clearBccs() {
      bccs = null;
      fieldSetFlags()[7] = false;
      return this;
    }

    @Override
    public Email build() {
      try {
        Email record = new Email();
        record.message_id = fieldSetFlags()[0] ? this.message_id : (java.lang.CharSequence) defaultValue(fields()[0]);
        record.date = fieldSetFlags()[1] ? this.date : (java.lang.Long) defaultValue(fields()[1]);
        record.from = fieldSetFlags()[2] ? this.from : (com.test.myapp.avro.EmailAddress) defaultValue(fields()[2]);
        record.subject = fieldSetFlags()[3] ? this.subject : (java.lang.CharSequence) defaultValue(fields()[3]);
        record.body = fieldSetFlags()[4] ? this.body : (java.lang.CharSequence) defaultValue(fields()[4]);
        record.tos = fieldSetFlags()[5] ? this.tos : (java.util.List<com.test.myapp.avro.EmailAddress>) defaultValue(fields()[5]);
        record.ccs = fieldSetFlags()[6] ? this.ccs : (java.util.List<com.test.myapp.avro.EmailAddress>) defaultValue(fields()[6]);
        record.bccs = fieldSetFlags()[7] ? this.bccs : (java.util.List<com.test.myapp.avro.EmailAddress>) defaultValue(fields()[7]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }
}

Solution

  • I think there's a problem in the mapper. When you declare:

    private static class MyAvroMapper extends Mapper<Text, Email, Email, Text> {
    

    you're telling hadoop that you're going to expect the couple (Text, Email) as the types of the key and the value of the input and the couple (Email, Text) as the type for key and values of what the mapper will emit (note that are the four types declared as generics for the class.

    But when you write a signature like this:

    protected void map(Email email, NullWritable value, Context context) throws IOException, InterruptedException {
    

    you're telling hadoop that the key and the value type for the input are Email and NullWritable, hence the exception.