Search code examples
javahadoopmapreducereduce-reduce-conflict

Text/String values sent in the Mapper are wrong when I read them in the Reducer


I am sending some data in the Mapper and when I try to read them in the Reducer, they have changed a little. In resume, I fill the data using the set functions and then I read them in the reducer using the get functions. I do not understand why if I do a println the data are different.

The data that I am sending are in a class called "ValorFechaHora", 3 variables Medicion, Fecha and Hora:

public class ValorFechaHora implements Writable {

	private IntWritable Medicion;
	private Text Fecha;
	private Text Hora;
	
	public void ValorFechaHora(){
		
	}
	
	public void ValorFechaHora(IntWritable Medicion, Text Fecha, Text Hora){
		setMedicion(Medicion);
		setFecha(Fecha);
		setHora(Hora);
	}
	
	public IntWritable getMedicion() {
		return Medicion;
	}

	public void setMedicion(IntWritable medicion) {
		Medicion = medicion;
	}

	public Text getFecha() {
		return Fecha;
	}

	public void setFecha(Text fecha) {
		Fecha = fecha;
	}

	public Text getHora() {
		return Hora;
	}

	public void setHora(Text hora) {
		Hora = hora;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((Fecha == null) ? 0 : Fecha.hashCode());
		result = prime * result + ((Hora == null) ? 0 : Hora.hashCode());
		result = prime * result
				+ ((Medicion == null) ? 0 : Medicion.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		ValorFechaHora other = (ValorFechaHora) obj;
		if (Fecha == null) {
			if (other.Fecha != null)
				return false;
		} else if (!Fecha.equals(other.Fecha))
			return false;
		if (Hora == null) {
			if (other.Hora != null)
				return false;
		} else if (!Hora.equals(other.Hora))
			return false;
		if (Medicion == null) {
			if (other.Medicion != null)
				return false;
		} else if (!Medicion.equals(other.Medicion))
			return false;
		return true;
	}

	public void readFields(DataInput in) throws IOException {
		
		Medicion = new IntWritable(in.readInt());
		Fecha = new Text(in.readLine());
		Hora = new Text(in.readLine());
	}
	
	public void write(DataOutput out) throws IOException {
		Medicion.write(out);
		Fecha.write(out);
		Hora.write(out);
	}	

}

Here you can see my mapper:

public static class LogsMapper extends
			Mapper<LongWritable, Text, Text, ValorFechaHora> {

		//En el mapper emitimos lo que leemos. Key = Dirección MAC. Value = Medición + Fecha + Hora
		
		private Text outKey = new Text();
		private ValorFechaHora outValue = new ValorFechaHora();
		
		@Override
		protected void map(LongWritable offset, Text line, Context context)
				throws IOException, InterruptedException {
			
			
			// Utilizamos row_auxiliar y row para leer los datos correctos (El offset no nos interesa)
			// Ejemplo de dato de entrada tras salir del filtrado básico "2536816	-47dB;8C:3A:E3:92:CB:3E;2014-11-12;14:22:20.795806"
			
			String row_auxiliar[] = line.toString().split("\t");
			String row[] = row_auxiliar[1].split(";");
			
			// Los datos en row quedan... ---> row[0]= Medicion row[1]= MAC row[2]= Fecha row[3]= Hora
	
			//Elegimos la MAC como key
			outKey = new Text(row[1]);
			
			//Elegimos la Medicion, Fecha y Hora como value
			outValue.setMedicion(new IntWritable(Integer.valueOf(row[0].substring(0,3))));
			outValue.setFecha(new Text(row[2]));
			outValue.setHora(new Text(row[3]));
			
			context.write(outKey, outValue);
				
		};

Here is my reducer:

public static class MaxReducer extends
			Reducer<Text, ValorFechaHora, Text, Text> {
		
		//En el reduce por ahora únicamente contamos el número de veces que ha sido la MAC registrada
		
		protected void reduce(Text MAC,
				Iterable<ValorFechaHora> values, Context context)
				throws IOException, InterruptedException {

			Text outKey = new Text();
			Text outValue = new Text();
			
			outKey = MAC;
			int sum = 0;
			
			for(ValorFechaHora val : values){
				System.out.println("1" + " " + val.getMedicion().toString());
				System.out.println("2" + " " + val.getFecha().toString());
				System.out.println("3" + " " + val.getHora().toString());
				
				sum = sum +1;
			}
			
			outValue = new Text(Integer.toString(sum));
			
			context.write(outKey, outValue);
		};

Well, I do not understand why when I am doing the bucle the variable val.getFecha().toString() in the reducer is different from the variable outKey.getFecha().toString in the mapper

TYA


Solution

  • You're using the wrong method calls to populate the Text objects. You should be using the readFields method of the Text object.

    Currently you are trying to populate the Text object via the constructor, which accepts a String as its argument. You can't just read a String back from the DataInput object using in.readLine as a Text object is serialized to the data stream without a terminating newline.

    To fix this, you should re-use you should either initialize your variables and then just use the readFields methods (this may have other knock on effects in your code as you're not currently using the object re-use pattern (which is more efficient than creating new objects for each K/V object):

    private IntWritable Medicion = new IntWritable();
    private Text Fecha = new Text();
    private Text Hora = new Text();
    
    public void readFields(DataInput in) {
        Medicion.readFields(in);
        Fecha.readFields(in);
        Hora.readFields(in);
    }
    

    Otherwise to keep your code as is (but less efficient) just update the readFields method as follows:

    public void readFields(DataInput in) {
        Medicion = new Text();
        Medicion.readFields(in);
    
        Fecha = new Text();
        Fecha.readFields(in);
    
        Hora = new Text();
        Hora.readFields(in);
    }