I'm trying use cassandra as a Key-Value Lookupstore in some of our spark jobs.
We use Dataframes primarily and have moved away from the RDD APIs.
Instead of joining with the tables, loading them into spark or
pushing the join to cassandra and taking measures to avoid large
table scans, i thought i could just write a Spark UDF that connects
to cassandra a looks up one key
I additionally want to convert the result row into a case class object and return the object.
I got some of this information based on responses from this question below. withSessionDo reuses an underlying JVM Level Session that is available on each node Spark Cassandra Connector proper usage
val connector = CassandraConnector(sparkConf) // I Know this is serializable.
def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
val result = session.execute( stmt.bind(key) )
MyCaseClass(
fieldl1 = result.getString(0),
fieldl2 = result.getInt(1)
...
)
}
})
Session isn't serializable so we cannot create one outside the udf and pass it in so we can use mapping manager to convert the rows to case class instances. An Alternative approach using Mapping Manager,
def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val manager = new MappingManager(session) // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
mapperClass.get(key)
}
})
I am new to cassandra so please bear with me on a few questions.
You're trying to emulate what Spark Cassandra Connector (SCC) is doing under the hood, but your implementation will be much slower that SCC's because you're using synchronous API, and getting all data one after another, while SCC is using asynchronous API, and pull data for multiple rows in parallel.
The best way to achieve what you want is to use Cassandra-optimized join (often called "direct join"). This kind of join was always available for RDD API, but for a long time was available for Dataframe API only in the commercial version of the connector. But since SCC 2.5.0 (released in May 2020th), this functionality is also available in open source version, so you can use it instead of building its emulation. The direct join is performed only when you enable special Catalyst extensions, by passing the spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
when configuring SparkSession (for example via command-line). After that, you can perform join with Cassandra table by full or partial primary key, and SCC will automatically convert join into individual requests to Cassandra that are executed very effectively. You can check that this happens by executing explain
on the joined dataframe, so you should see something like this (look for string Cassandra Direct Join):
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
+- *(1) Range (1, 5, step=1, splits=8)
I recently wrote a long blog post that explains how to perform effective joins with data in Cassandra using both Dataframe & RDD APIs - I don't want to repeat it here :-)