Search code examples
hadoopapache-pig

Loading Files in UDF


I have a requirement of populating a field based on the evaluation of a UDF. The input to the UDF would be some other fields in the input and as well as an csv sheet. Presently, the approach I have taken is to load the CSV file, group it ALL and then pass it as a bag to the UDF along with other required parameters. However, its taking a very long time to complete the process (roughly about 3 hours) for source data of 170k records and as well as csv records of about 150k.

I'm sure there must be much better efficient way to handle this and hence need your inputs.

source_alias = LOAD 'src.csv' USING 
                            PigStorage(',') AS (f1:chararray,f2:chararray,f3:chararray);

csv_alias = LOAD 'csv_file.csv' USING 
                            PigStorage(',') AS (c1:chararray,c2:chararray,c3:chararray);

grpd_csv_alias = GROUP csv_alias ALL;

final_alias = FOREACH source_alias GENERATE f1 AS f1,
myUDF(grpd_csv_alias, f2) AS derived_f2;

Here is my UDF on a high level.

public class myUDF extends EvalFunc<String> {

    public String exec(Tuple input) throws IOException {

        String f2Response = "N";
        DataBag csvAliasBag = (DataBag)input.get(0);
        String f2 = (String) input.get(1);
        
        try {
            
                Iterator<Tuple> bagIterator = csvAliasBag.iterator();


                while (bagIterator.hasNext()) {
                    Tuple localTuple = (Tuple)bagIterator.next();
                    String col1 = ((String)localTuple.get(1)).trim().toLowerCase();
                    String col2 = ((String)localTuple.get(2)).trim().toLowerCase();
                    String col3 = ((String)localTuple.get(3)).trim().toLowerCase();
                    String col4 = ((String)localTuple.get(4)).trim().toLowerCase();

                    <Custom logic to populate f2Response based on the value in f2 and as well as col1, col2, col3 and col4>

                }
            }

            return f2Response;
        }
        catch(Exception e){
            throw new IOException("Caught exception processing input row ", e);
        }


    }

}

I believe the process is taking too long because of building and passing csv_alias to the UDF for each row in the source file.

Is there any better way to handle this?

Thanks


Solution

  • For small files, you can put them on the distributed cache. This copies the file to each task node as a local file then you load it yourself. Here's an example from the Pig docs UDF section. I would not recommend parsing the file each time, however. Store your results in a class variable and check to see if it's been initialized. If the csv is on the local file system, use getShipFiles. If the csv you're using is on HDFS, used the getCachedFiles method. Notice that for HDFS there's a file path followed by a # and some text. To the left of the # is the HDFS path and to the right is the name you want it to be called when it's copied to the local file system.

    public class Udfcachetest extends EvalFunc<String> { 
    
    public String exec(Tuple input) throws IOException { 
        String concatResult = "";
        FileReader fr = new FileReader("./smallfile1"); 
        BufferedReader d = new BufferedReader(fr);
        concatResult +=d.readLine();
        fr = new FileReader("./smallfile2");
        d = new BufferedReader(fr);
        concatResult +=d.readLine();
        return concatResult;
    } 
    
    public List<String> getCacheFiles() { 
        List<String> list = new ArrayList<String>(1); 
        list.add("/user/pig/tests/data/small#smallfile1");  // This is hdfs file
        return list; 
    } 
    
    public List<String> getShipFiles() {
        List<String> list = new ArrayList<String>(1);
        list.add("/home/hadoop/pig/smallfile2");  // This local file
        return list;
    }
    }