Search code examples
javacascadingbigdata

How to join two files via Cascading


Lets see what we have. First file [Interface Class]:

list arrayList
list linkedList

Second file[Class countOfInstanse]:

arrayList 120
linkedList 4

I would like to join this two files by key[Class] and get count per each Interface:

list 124

and code:

public class Main
{
  public static void main( String[] args )
  {
    String docPath = args[ 0 ];
    String wcPath = args[ 1 ];
    String stopPath = args[ 2 ];

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    AppProps.setApplicationName( properties, "Part 1" );
    AppProps.addApplicationTag( properties, "lets:do:it" );
    AppProps.addApplicationTag( properties, "technology:Cascading" );
    FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );

    // create source and sink taps
    Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
    Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );

    Fields stop = new Fields( "class" );
    Tap classTap = new Hfs( new TextDelimited( true, "\t" ), stopPath );

    // specify a regex operation to split the "document" text lines into a token stream
    Fields token = new Fields( "token" );
    Fields text = new Fields( "interface" );
    RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
    Fields fieldSelector = new Fields( "interface", "class" );
    Pipe docPipe = new Each( "token", text, splitter, fieldSelector );

    // define "ScrubFunction" to clean up the token stream
    Fields scrubArguments = new Fields( "interface", "class" );
    docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );

    Fields text1 = new Fields( "amount" );
    // RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
    Fields fieldSelector1 = new Fields( "class", "amount" );
    Pipe stopPipe = new Each( "token1", text1, splitter, fieldSelector1 );
    Pipe tokenPipe = new CoGroup( docPipe, token, stopPipe, text, new InnerJoin() );
    tokenPipe = new Each( tokenPipe, text, new RegexFilter( "^$" ) );

    // determine the word counts
    Pipe wcPipe = new Pipe( "wc", tokenPipe );
    wcPipe = new Retain( wcPipe, token );
    wcPipe = new GroupBy( wcPipe, token );
    wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );

    // connect the taps, pipes, etc., into a flow
    FlowDef flowDef = FlowDef.flowDef().setName( "wc" ).addSource( docPipe, docTap ).addSource( stopPipe, classTap ).addTailSink( wcPipe, wcTap );

    // write a DOT file and run the flow
    Flow wcFlow = flowConnector.connect( flowDef );
    wcFlow.writeDOT( "dot/wc.dot" );
    wcFlow.complete();
  }
}

[I decided to resolve this issue step-by-step and left final result here for others. So first step - Couldn`t join two files with one key via Cascading (Not Completed yet) ]


Solution

  • I would convert the two files to two Map objects, iterate through the keys and sum up the numbers. Then you can write them back to a file.

      Map<String,String> nameToType = new HashMap<String,String>();
      Map<String,Integer> nameToCount = new HashMap<String,Integer>();
      //fill Maps from file here
      Map<String,Integer> result = new HashMap<String,Integer>();
      for (String name: nameToType.keyset())
      {
            String type = nameToType.get(name);
            int count = nameToCount.get(type);
    
            if (!result.containsKey(type))
                result.put(type,0);
            result.put(type, result.get(type) + count);
       }