Search code examples
javaloopsapache-sparkoptimizationcalculated-columns

Iterate over different columns using withcolumn in Java Spark


I have to modify a Dataset<Row> according to some rules that are in a List<Row>. I want to iterate over the Datset<Row> columns using Dataset.withColumn(...) as seen in the next example:

(import necesary libraries...)

SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.

for (Row row : ListWithInfo) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

            dfToModify = dfToModify.withColumn(field,
                                    when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("conditionAuxField").equalTo(conditionAux))
                                    ,output)
                                    .otherwise(dfToModify.col(field)));

        }

The code does works as it should, but when there are more than 50 "rules" in the List, the program doesn't finish and this output is shown in the screen:

0/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1653
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1650
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1635
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1641
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1645
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1646
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on **************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on ***************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1639
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1649
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1651
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 6
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Removing executor 6 because it has been idle for 60 seconds (new desired total will be 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 6.
20/01/27 17:49:19 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, *********************, 43387, None)
20/01/27 17:49:19 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor
20/01/27 17:49:19 INFO cluster.YarnScheduler: Executor 6 on **************** killed by driver.
20/01/27 17:49:19 INFO spark.ExecutorAllocationManager: Existing executor 6 has been removed (new total is 0)
20/01/27 17:49:20 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:21 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:22 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
.
.
.
.

Is there any way to make it more efficient using Java Spark? (without using for loop or something similar)


Solution

  • Finally I used withColumns method of Dataset<Row> objet. This method need two arguments:

    .withColumns(Seq<String> ColumnsNames, Seq<Column> ColumnsValues);

    And in the Seq<String> can not be duplicated.

    The code is as follow:

    
    SparkSession spark = SparkSession
                    .builder()
                    .appName("appname")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
    Dataset<Row> dfToModify = spark.read().table("TableToModify");
    
    List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());
    
    ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
    ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
    ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
    ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
    .
    .
    .
    // initialize values for fields and conditions
    String field_ant = ListWithInfo.get(0).getString(0).toLowerCase();
    String first_input = ListWithInfo.get(0).getString(1);
    String first_output = ListWithInfo.get(0).getString(2);
    String first_conditionAux = ListWithInfo.get(0).getString(3);
    Column whenColumn = when(dfToModify.col(field_ant).equalTo(first_input)
                    .and(dfToModify.col("conditionAuxField").equalTo(lit(first_conditionAux)))
                    ,first_output);
    
    // lists with the names of the fields and the conditions        
    List<Column> whenColumnList = new ArrayList(Arrays.asList());
    List<String> fieldsNameList = new ArrayList(Arrays.asList());
    
    for (Row row : ListWithInfo.subList(1,ListWithInfo.size())) {
    
                String field = row.getString(0);
                String input = row.getString(1);
                String output = row.getString(2);
                String conditionAux = row.getString(3);
    
               if (field.equals(field_ant)) {
                     // if field is equals to fiel_ant the new condition is added to the previous one
                    whenColumn = whenColumn.when(dfToModify.col(field).equalTo(input)
                            .and(dfToModify.col("conditionAuxField").equalTo(lit(conditionAux)))
                            ,output);
                } else {
                    // if field is diferent to the previous:
                    // close the conditions for this field
                    whenColumn = whenColumn.otherwise(dfToModify.col(field_ant));
    
                    // add to the lists the field(String) and the conditions (columns)
                    whenColumnList.add(whenColumn);
                    fieldsNameList.add(field_ant);
    
                    // and initialize the conditions for the new field
                    whenColumn = when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("branchField").equalTo(lit(branch)))
                            ,output);
                }
    
                field_ant = field;
    
            }
    
    // add last values
    whenColumnList.add(whenColumn);
    fieldsNameList.add(field_ant);
    
    // transform list to Seq
    Seq<Column> whenColumnSeq = JavaConversions.asScalaBuffer(whenColumnList).seq();
    Seq<String> fieldsNameSeq = JavaConversions.asScalaBuffer(fieldsNameList).seq();
    
    Dataset<Row>  dfModified = dfToModify.withColumns(fieldsNameSeq, whenColumnSeq);