I have a table with the structure below.
trans_count
start_time, end_time, count
00:00:01 00:00:10 1000
00:00:11 00:00:20 800
Spark listens for events from Kafka and does a grouping for 10 seconds and will have to insert to Phoenix hbase table.
After 10 seconds, i have to first check if the start_time,end_time combination is in the table. If it is there, we have to take the existing count and add the new count and upsert it again.
UPSERT INTO trans_count(start_time, end_time, count) SELECT start_time, end_time, count? FROM trans_count WHERE start_time = ? AND end_time = ?
If no rows upserted in the above statement, then simply upsert the data.
In Apache Storm, i was able to create a Phoenix Connection object in the configure method and was able to use the same connection once every 10 seconds to UPSERT.
In Spark, I could not create a connection object and use the same object for every Object in the RDD. My output from spark will be a JavaDStream> where start_time, end_time, count are all keys in the Map.
I end up creating a connection object for every iteration of RDD, which i feel is not the right way. I have read that Phoenix connections are light weight but creating a connection object for every RDD seems to be not the right way.
I read some blogs about related stuff, but could not getting this going. Kindly help.
NOTE : The appliation is built in JAVA.
Solution:
Instead of creating a connection per objectin RDD, Create a connection per partition in the RDD and use that for all the objects.