Search code examples
cascading

Create new field


How can I create a new field for my results?

I'm using a wordcount job that I want to append a timestamp to. Currently it contains the fields 'word' and 'count'.

My goal is to create a tuple that looks like this: 'word' 'count' 'timestamp'

This is my code so far. I try to append the timestamp within a custom function that is called 'TimestampAppender'

wcPipe = new Each(wcPipe, Fields.ALL, new TimestampAppender(Fields.ALL), Fields.RESULTS);

TimestampAppender:

public class TimestampAppender extends BaseOperation implements Function {
  public TimestampAppender(Fields fieldDeclaration) {
    super(2, fieldDeclaration);
}

public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
    TupleEntry argument = functionCall.getArguments();

    String arg0 = argument.getString(0);
    String arg1 = argument.getString(1);

    Tuple result = new Tuple();
    result.addString(arg0);
    result.addString(arg1);
    result.addString("01-01-2015");

    functionCall.getOutputCollector().add(result);
}

Solution

  • You can change your code as:

    wcPipe = new Each(wcPipe, new Insert(new Fields("timestamp"), ""), Fields.ALL);
    wcPipe = new Each(wcPipe, Fields.ALL, new TimestampAppender(Fields.ALL), Fields.RESULTS);
    

    TimestampAppender:

    public class TimestampAppender extends BaseOperation implements Function {
      public TimestampAppender(Fields fieldDeclaration) {
        super(Fields.ARGS);
    }
    
    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
        TupleEntry argument = functionCall.getArguments();
    
        String arg0 = argument.getString(0);
        String arg1 = argument.getString(1);
    
        Tuple result = new Tuple();
        result.addString(arg0);
        result.addString(arg1);
        result.addString("01-01-2015");
    
        functionCall.getOutputCollector().add(result);
    }