Search code examples
spark-streamingapache-kafka-streamsazure-databricks

Apply function on column values after streaming from Kafka


I need to apply a function on certain columns just after reading the stream from a Kafka topic and before writing it to any landing or table.

This is done in azure databricks.

CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';

select encrypt(123,'key');

var streamingSelectDF = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootStrapServers)
    .option("subscribe", topicName)     
    .option("startingOffsets", "earliest")  
    .load()
 .selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")

The above code creates a function and reads the json data from the kafka stream and explodes it into multiple columns.

Next need to apply the above function on few columns and transform it before saving it to a landing or table.

I am getting various different error while trying.

streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
.option("mergeschema",true)
 .option("checkpointLocation", checkPointPath)
.format("delta")
.trigger(Trigger.ProcessingTime("5 seconds"))
.table("raw_data")

command-3006790186109139:29: error: not found: value encrypt
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")

   

Can anyone help me on how to achieve the same.


Solution

  • Solved it with

    .withColumn("encrypted_acctnum",expr("encrypt(acctnum, 'b1')"))