Search code examples
apache-pigflattenbag

FLATTEN On bag NOT working as expected


Input : a.csv file having a Map data

[banks#{(bofa),(chase)}]

Pig Script :

A = LOAD 'a.csv' AS (bank_details:map[]);
B = FOREACH A GENERATE FLATTEN(bank_details#'banks') AS bank_name;

Output : B :

({(bofa),(chase)})

Applying Flatten on bag

C = FOREACH A GENERATE bank_details#'banks' AS banks: bag{t:(bank:chararray)};
D = FOREACH C GENERATE FLATTEN(banks);

Output : D :

org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POProject (Name: Project[bag][0] - scope-114 Operator Key: scope-114) children: null at []]: java.lang.ClassCastException: org.apache.pig.data.DataByteArray cannot be cast to org.apache.pig.data.DataBag
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:366)

Expected Output :

(bofa)
(chase)

If input file has got a bag as below :

Input : a.csv

{(bofa),(chase)}

Pig Script :

A = LOAD 'a.csv' AS (bank_details:bag{t:(bank_name:chararray)});
B = FOREACH A GENERATE FLATTEN(bank_details) AS bank_name;

Output : B : Generating the flattened result

(bofa)
(chase)

Any inputs on why we are not able to flatten the bag in alias C and D.


Solution

  • The problem here is that when you do not specify a schema for a map, it defaults to bytearray, as you can see in the official documentation:

    A = LOAD 'a.csv' AS (bank_details:map[]);
    B = FOREACH A GENERATE FLATTEN(bank_details#'banks') AS bank_name;
    describe B;
    B: {bank_name: bytearray}
    

    Therefore, when you try to cast it to a bag it will result in a ClassCastException because DataByteArray cannot be cast to DataBag. If you perform a dump on C it will still work because you are not doing any real operation on the data, merely projecting it. However, once you call the FLATTEN function it will expect to receive a DataBag, and fail when trying to cast your bytearray to it.

    The reason why it works in your second case is that you are correctly indicating the schema for the map, which is a bag, so it won't get the default value, which is bytearray:

    A = LOAD 'a.csv' AS (bank_details:bag{t:(bank_name:chararray)});
    

    EDIT

    Sorry, I didn't see that in the second case you are not using a map, you are using directly a bag. If you want to use a map, you can as long as you indicate the schema to avoid what mentioned above:

    A = LOAD 'a.csv' AS (bank_details:map[{(name:chararray)}]);
    B = FOREACH A GENERATE FLATTEN(bank_details#'banks') AS bank_name;
    
    dump B;
    (bofa)
    (chase)