I am using spark 2.0, developing using scala and want to implement a conditional statement to populate a table. The purpose is to carry out Change Data Capture (CDC). Please see the pseudo code below to have better understanding.
val rawDataPartition = "select partition_date from rawDataTableName limit 1";
if(value_of_rawDataPartition == current_timestamp()) {
//Logic for CDC
} else {
//Load the empty table as its first time load
}
How can I do this?
Edit:
Following piece of code works fine with small tables but is creating memory issues (mentioned after code) and hence failing when dataframe is large (281 columns in my case).
//function for uniform null values
def replace(Colm :Column) : org.apache.spark.sql.Column= {
return when(Colm==="null" || Colm==="NULL" || Colm==="" ||
Colm==="Null" || Colm==="\\N",lit(null)).otherwise(Colm)
}
val rawdata_sql = "select "+ rawDataTableColumns + ",md5(concat_ws(''," + rawDataTableStrColumns+")) as Hash_Column, current_timestamp as LAST_MDF_DT from "+ rawDataTableName + " where partition_date='" + folderDate + "'"
var batch_data = hqlContext.sql(rawdata_sql).toDF()
batch_data.dtypes.foreach { f =>
val fName = f._1
val fType = f._2
if (fType == "StringType") {
batch_data=batch_data.withColumn(fName,replace(col(fName)))
}}
Error Produced:
TaskSetManager: Lost task 0.0 in stage 4.0 (TID 2, ICDCA0000020.cn.isn.corpintra.net): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;)Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate" grows beyond 64 KB
/* 001 */ public SpecificPredicate generate(Object[] references) {
/* 002 */ return new SpecificPredicate(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.codegen.Predicate {
/* 006 */ private final Object[] references;
/* 007 */
/* 008 */
/* 009 */
/* 010 */ public SpecificPredicate(Object[] references) {
/* 011 */ this.references = references;
/* 012 */
/* 013 */ }
/* 014 */
/* 015 */ public boolean eval(InternalRow i) {
/* 016 */
/* 017 */ boolean isNull2 = true;
/* 018 */ UTF8String value2 = null;
/* 019 */
/* 020 */
/* 021 */ boolean isNull7 = false;
/* 022 */
/* 023 */ boolean isNull8 = false;
/* 024 */
/* 025 */ Object obj = ((Expression) references[0]).eval(null);
/* 026 */ UTF8String value11 = (UTF8String) obj;
/* 027 */
/* 028 */ boolean isNull12 = i.isNullAt(126);
/* 029 */ UTF8String value12 = isNull12 ? null : (i.getUTF8String(126));
/* 030 */
/* 031 */ boolean isNull13 = i.isNullAt(225);
/* 032 */ UTF8String value13 = isNull13 ? null : (i.getUTF8String(225));
/* 033 */
/* 034 */ boolean isNull14 = i.isNullAt(47);
/* 035 */ UTF8String value14 = isNull14 ? null : (i.getUTF8String(47));
/* 036 */
/* 037 */ boolean isNull15 = i.isNullAt(44);
/* 038 */ UTF8String value15 = isNull15 ? null : (i.getUTF8String(44));
/* 039 */
/* 040 */ boolean isNull16 = i.isNullAt(258);
/* 041 */ UTF8String value16 = isNull16 ? null : (i.getUTF8String(258));
/* 042 */
/* 043 */ boolean isNull17 = i.isNullAt(91);
/* 044 */ UTF8String value17 = isNull17 ? null : (i.getUTF8String(91));
/* 045 */
/* 046 */ boolean isNull18 = i.isNullAt(71);
/* 047 */ UTF8String value18 = isNull18 ? null : (i.getUTF8String(71));
/* 048 */
/* 049 */ boolean isNull19 = i.isNullAt(140);
/* 050 */ UTF8String value19 = isNull19 ? null : (i.getUTF8String(140));
/* 051 */
/* 052 */ boolean isNull21 = i.isNullAt(166);
/* 053 */ long value21 = isNull21 ? -1L : (i.getLong(166));
/* 054 */ boolean isNull20 = isNull21;
/* 055 */ UTF8String value20 = null;
/* 056 */ if (!isNull21) {
/* 057 */ value20 = UTF8String.fromString(
/* 058 */ org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString(value21));
/* 059 */ }
/* 060 */
/* 061 */ boolean isNull23 = i.isNullAt(167);
/* 062 */ long value23 = isNull23 ? -1L : (i.getLong(167));
/* 063 */ boolean isNull22 = isNull23;
.....
..
..
..
.
/* 9893 */ boolean isNull2620 = i.isNullAt(126);
/* 9894 */ UTF8String value2620 = isNull2620 ? null : (i.getUTF8String(126));
/* 9895 */ isNull2599 = isNull2620;
/* 9896 */ value2599 = value2620;
/* 9897 */ }
/* 9898 */ value = (!(isNull2599));
/* 9899 */ }
/* 9900 */ return !false && value;
/* 9901 */ }
/* 9902 */ }
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;)Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:10242)
at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:9939)
at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:9921)
at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5081)
at org.codehaus.janino.UnitCompiler.access$12100(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$15.visitLocalVariableAccess(UnitCompiler.java:5065)
at org.codehaus.janino.Java$LocalVariableAccess.accept(Java.java:3186)
at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5077)
at org.codehaus.janino.UnitCompiler.access$11600(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$15.visitLocalVariableAccess(UnitCompiler.java:5065)
at org.codehaus.janino.Java$LocalVariableAccess.accept(Java.java:3186)
at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5077)
at org.codehaus.janino.UnitCompiler.access$11600(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$15.visitAmbiguousName(UnitCompiler.java:5060)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3141)
at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2675)
at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:993)
at org.codehaus.janino.UnitCompiler.access$1000(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$4.visitBlock(UnitCompiler.java:935)
at org.codehaus.janino.Java$Block.accept(Java.java:2012)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
...
at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:887)
... 35 more
Driver stacktrace:)
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/06/30 19:28:36 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
... so on
if you are using Dataframe
s then you can do it using when
function
dataframe
.withColumn("newColumn",
when(col("partition_date") === current_timestamp(), call_CDC).otherwise(lit(""))
)
Edited
There are some improvements needed in your code and foreach is creating the memory overhead. Try following code, should be lighter than the one you are using
//function for uniform null values
def replace(Colm :Column) = when(Colm==="null" || Colm==="NULL" || Colm==="" || Colm==="Null" || Colm==="\\N", lit(null)).otherwise(Colm)
val rawdata_sql = "select "+ rawDataTableColumns + ",md5(concat_ws(''," + rawDataTableStrColumns+")) as Hash_Column, current_timestamp as LAST_MDF_DT from "+ rawDataTableName + " where partition_date='" + folderDate + "'"
var batch_data = hqlContext.sql(rawdata_sql)
val tocheckCols = batch_data.schema.filter(f => f.dataType == DataTypes.StringType)
tocheckCols.map(c => {
batch_data = batch_data.withColumn(c.name, replace(col(c.name)))
})