Search code examples
sqlscalaapache-sparkconditional-statementscdc

How to use Spark-SQL query for implementing conditional statement?


I am using spark 2.0, developing using scala and want to implement a conditional statement to populate a table. The purpose is to carry out Change Data Capture (CDC). Please see the pseudo code below to have better understanding.

val rawDataPartition = "select partition_date from rawDataTableName limit 1";

if(value_of_rawDataPartition == current_timestamp()) {
//Logic for CDC 
} else {
//Load the empty table as its first time load
} 

How can I do this?

Edit:

Following piece of code works fine with small tables but is creating memory issues (mentioned after code) and hence failing when dataframe is large (281 columns in my case).

    //function for uniform null values
       def replace(Colm :Column) : org.apache.spark.sql.Column= {
              return when(Colm==="null" || Colm==="NULL" || Colm==="" || 
        Colm==="Null" || Colm==="\\N",lit(null)).otherwise(Colm)
       }

val rawdata_sql = "select  "+ rawDataTableColumns + ",md5(concat_ws(''," + rawDataTableStrColumns+")) as Hash_Column, current_timestamp as LAST_MDF_DT from "+ rawDataTableName + " where partition_date='" + folderDate + "'" 
var batch_data = hqlContext.sql(rawdata_sql).toDF()
batch_data.dtypes.foreach { f => 
val fName = f._1
val fType = f._2
if (fType == "StringType") {
batch_data=batch_data.withColumn(fName,replace(col(fName)))
}}

Error Produced:

TaskSetManager: Lost task 0.0 in stage 4.0 (TID 2, ICDCA0000020.cn.isn.corpintra.net): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;)Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate" grows beyond 64 KB
/* 001 */ public SpecificPredicate generate(Object[] references) {
/* 002 */   return new SpecificPredicate(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.codegen.Predicate {
/* 006 */   private final Object[] references;
/* 007 */
/* 008 */
/* 009 */
/* 010 */   public SpecificPredicate(Object[] references) {
/* 011 */     this.references = references;
/* 012 */
/* 013 */   }
/* 014 */
/* 015 */   public boolean eval(InternalRow i) {
/* 016 */
/* 017 */     boolean isNull2 = true;
/* 018 */     UTF8String value2 = null;
/* 019 */
/* 020 */
/* 021 */     boolean isNull7 = false;
/* 022 */
/* 023 */     boolean isNull8 = false;
/* 024 */
/* 025 */     Object obj = ((Expression) references[0]).eval(null);
/* 026 */     UTF8String value11 = (UTF8String) obj;
/* 027 */
/* 028 */     boolean isNull12 = i.isNullAt(126);
/* 029 */     UTF8String value12 = isNull12 ? null : (i.getUTF8String(126));
/* 030 */
/* 031 */     boolean isNull13 = i.isNullAt(225);
/* 032 */     UTF8String value13 = isNull13 ? null : (i.getUTF8String(225));
/* 033 */
/* 034 */     boolean isNull14 = i.isNullAt(47);
/* 035 */     UTF8String value14 = isNull14 ? null : (i.getUTF8String(47));
/* 036 */
/* 037 */     boolean isNull15 = i.isNullAt(44);
/* 038 */     UTF8String value15 = isNull15 ? null : (i.getUTF8String(44));
/* 039 */
/* 040 */     boolean isNull16 = i.isNullAt(258);
/* 041 */     UTF8String value16 = isNull16 ? null : (i.getUTF8String(258));
/* 042 */
/* 043 */     boolean isNull17 = i.isNullAt(91);
/* 044 */     UTF8String value17 = isNull17 ? null : (i.getUTF8String(91));
/* 045 */
/* 046 */     boolean isNull18 = i.isNullAt(71);
/* 047 */     UTF8String value18 = isNull18 ? null : (i.getUTF8String(71));
/* 048 */
/* 049 */     boolean isNull19 = i.isNullAt(140);
/* 050 */     UTF8String value19 = isNull19 ? null : (i.getUTF8String(140));
/* 051 */
/* 052 */     boolean isNull21 = i.isNullAt(166);
/* 053 */     long value21 = isNull21 ? -1L : (i.getLong(166));
/* 054 */     boolean isNull20 = isNull21;
/* 055 */     UTF8String value20 = null;
/* 056 */     if (!isNull21) {
/* 057 */       value20 = UTF8String.fromString(
/* 058 */         org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString(value21));
/* 059 */     }
/* 060 */
/* 061 */     boolean isNull23 = i.isNullAt(167);
/* 062 */     long value23 = isNull23 ? -1L : (i.getLong(167));
/* 063 */     boolean isNull22 = isNull23;
.....
..
..
..
.
/* 9893 */         boolean isNull2620 = i.isNullAt(126);
/* 9894 */         UTF8String value2620 = isNull2620 ? null : (i.getUTF8String(126));
/* 9895 */         isNull2599 = isNull2620;
/* 9896 */         value2599 = value2620;
/* 9897 */       }
/* 9898 */       value = (!(isNull2599));
/* 9899 */     }
/* 9900 */     return !false && value;
/* 9901 */   }
/* 9902 */ }

        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        ... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;)Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate" grows beyond 64 KB
        at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
        at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
        at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:10242)
        at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:9939)
        at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:9921)
        at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5081)
        at org.codehaus.janino.UnitCompiler.access$12100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$15.visitLocalVariableAccess(UnitCompiler.java:5065)
        at org.codehaus.janino.Java$LocalVariableAccess.accept(Java.java:3186)
        at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
        at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5077)
        at org.codehaus.janino.UnitCompiler.access$11600(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$15.visitLocalVariableAccess(UnitCompiler.java:5065)
        at org.codehaus.janino.Java$LocalVariableAccess.accept(Java.java:3186)
        at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
        at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5077)
        at org.codehaus.janino.UnitCompiler.access$11600(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$15.visitAmbiguousName(UnitCompiler.java:5060)
        at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3141)
        at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
        at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2675)
        at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
        at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
        at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
        at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
        at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
        at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
        at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
        at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
        at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:993)
        at org.codehaus.janino.UnitCompiler.access$1000(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$4.visitBlock(UnitCompiler.java:935)
        at org.codehaus.janino.Java$Block.accept(Java.java:2012)
        at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
        ...
        at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
        at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
        at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
        at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:887)
        ... 35 more

Driver stacktrace:)
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
... so on

Solution

  • if you are using Dataframes then you can do it using when function

    dataframe
      .withColumn("newColumn", 
        when(col("partition_date") === current_timestamp(), call_CDC).otherwise(lit(""))
      )
    

    Edited

    There are some improvements needed in your code and foreach is creating the memory overhead. Try following code, should be lighter than the one you are using

    //function for uniform null values
    def replace(Colm :Column) = when(Colm==="null" || Colm==="NULL" || Colm==="" || Colm==="Null" || Colm==="\\N", lit(null)).otherwise(Colm)
    
    val rawdata_sql = "select  "+ rawDataTableColumns + ",md5(concat_ws(''," + rawDataTableStrColumns+")) as Hash_Column, current_timestamp as LAST_MDF_DT from "+ rawDataTableName + " where partition_date='" + folderDate + "'"
    var batch_data = hqlContext.sql(rawdata_sql)
    
    val tocheckCols = batch_data.schema.filter(f => f.dataType == DataTypes.StringType)
    tocheckCols.map(c => {
      batch_data = batch_data.withColumn(c.name, replace(col(c.name)))
    })