i am new to spark. We are currently building a pipeline :
So, my problem is when i want to use spark-redis library it performs very well, but data stays static in my streaming job.
Although data is refreshed at Redis, it does not reflect to my dataframe. Spark reads data at first then never updates it. Also i am reading from REDIS data at first,total data about 1mio key-val string.
What kind of approaches/methods i can do, i want to use Redis as in-memory dynamic lookup. And lookup table is changing almost 1 hour.
Thanks.
used libraries: spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar
Here is the code part:
import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events")
val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
.alias("C"))
import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*")
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )
val curated_df = my_raw_events_df
.select(
...
$"C.SystemEntryDate".alias("RecordCreateDate")
,$"C.Profile".alias("ProfileCode")
,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
,$"C.IdentityType"
,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")
...
).as("C")
import org.apache.spark.sql.streaming.Trigger
val query = curated_df
.select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("topic", "curated-event")
.option("checkpointLocation","/user/spark/checkPointLocation/xyz")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
One option is to not use spark-redis, but rather lookup in Redis directly. This can be achieved with df.mapPartitions
function. You can find some examples for Spark DStreams here https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/. The idea for Structural Streaming is similar. Be careful to handle the Redis connection properly.