I am trying to pull incremental data from Greenplum database using Spark. We have incremental data for each table coming with a key called transactionId
.
Each transactionId
could contain data of one row or many number of rows. All these are stored in a metadata table: incKeyTable
.
We also have the last moved transactionID
of every table in another metadata table: incKeyLoads
. This table contains one entry per table which is the last updated transactionId
into the production table.
In order to find out the incremental transactionid
for each table, I have come up with the below logic.
val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap
Now I have my last updated transaction ID in a scala Map as below:
lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)
To find out the latest transactionId
(incremental data) coming greenplum, I have written the following logic to query the metadata table: incKeyTable
Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
val returnMsg = "Full loads on this table"
val count = incTransIds.where($"load_type" === "FULLLOAD").count
if(count == 0) {
incTransIds.createOrReplaceTempView("incTransID")
val execQuery = s"SELECT transactionId from incTransID order by transactionId desc"
val incLogIdDf = spark.sql(execQuery)
incLogIdDf.show
val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
pushLogIds
} else {
println("Full load count is greater than zero..")
returnMsg
}
}
var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
val tablename = keyTable.split("\\.") // Tablename = schema.tablename
val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
}
)
This method is working but it is taking so long that I can pull whole data from greenplum on a table level before this search completes as the dataframe cdf is a huge one. I tried to cache the dataframe: cdf but that contains nearly 5million rows and was advised not to cache such big table in to cache. I couldn't think of other way where I can make this search faster. Could anyone let me know of an idea to make this an efficient process ?
The code in the question cannot be what you are actually running because you are returning pushLogIds
in sortLogIds
, which is never defined, and you are selecting from schema.tablename
as opposed to from s"schema.$tablename"
. This makes it difficult to know for sure what's going on...
That said, from a patterns of big data processing standpoint, there are several potential problems with your approach:
Iteration instead of UNION tranformations. Other things being equal, rather than issuing many separate queries and then assembling the results on the driver, it is preferable to think of ways to issue a single query. This is how the optimizer has a chance to help. In your case, consider creating a Greenplum view that combines all the tables that are in lutransIdTableMap
.
Actions instead of join transformations. In sortLogIds
you are executing a count
action just to decide whether to run additional queries. Other things being equal, it's better to express this via a join transformation in order to delay running the action. Later you issue a show
, which, under the covers is equivalent to take(n)
. Is this action really necessary? Later you use collect
in order to generate a SQL expression to use in an IN
operator. This is another example where you should be using a join instead. All in all, you are executing the same Greenplum base query represented by incTransId
three times. If you insist on this type of processing, you should absolutely persist incTransId
in some way.
SQL assembly instead of DSL use. As a rule, if you are using Spark via a programming language and not via SparkSQL, you should be using the DSL instead of assembling SQL expressions as strings. This way, you wouldn't need to redefine views, etc.
There is too much to fix here without having the full code and without knowing the exact Greenplum schema + distribution strategy + indexes (if any) and the data sizes involved. However, the above should give you a starting point.
Here is an example of how to switch from using iteration to unions.
val allData = Map("table1" -> 101, "table2" -> 212)
.map { case (tableName, id) =>
spark.table(tableName).withColumn("id", lit(id))
}
.reduceLeft(_ union _)
Here is an example of how to use joins instead of collect
+ IN
.
val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action
The above example also shows how you can use datasets with collect
, replacing, for example, .collect().map(_.getInt(0).toString)
with .as[String].collect
, which is simpler, safer and faster.
Hope this helps!