Search code examples
apache-sparkignite

Persisting Spark DataFrame to Ignite


I want to persist Spark Dataframe to Ignite. When I explored, I came across ignite-spark, which helps to do this. But currently ignite-spark works only with Spark 2.3, not Spark 2.4.

So I fallback to the traditional approach of

df.write.format("jdbc")

Now, my code looks like below.

df.write
     .format("jdbc")
     .option("url", "jdbc:ignite:thin://127.0.0.1:10800")
     .option("dbtable", "sample_table")
     .option("user", "ignite")
     .option("password", "ignite")
     .mode(SaveMode.Overwrite)
     .save()

The problem I'm facing now is due to missing a primary key in my DataFrame which is mandatory for Ignite, Kindly suggest how to overcome this issue.

Error Stack Trace below:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.sql.SQLException: No PRIMARY KEY defined for CREATE TABLE
    at org.apache.ignite.internal.jdbc.thin.JdbcThinConnection.sendRequest(JdbcThinConnection.java:750)
    at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.execute0(JdbcThinStatement.java:212)
    at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.executeUpdate(JdbcThinStatement.java:340)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.ev.spark.job.Ignite$.delayedEndpoint$com$ev$spark$job$Ignite$1(Ignite.scala:52)
    at com.ev.spark.job.Ignite$delayedInit$body.apply(Ignite.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.ev.spark.job.Ignite$.main(Ignite.scala:9)
    at com.ev.spark.job.Ignite.main(Ignite.scala)

Edit:

I'm looking for a solution to create the table on the fly before persisting the DF. In my case, I already have one or more fields in my DF which somehow I have to communicate with Spark to use as a primary key for table creation.


Solution

  • Try to create an underlying Ignite table beforehand with Ignite DDL. Define some primary key, such as id. Then use Spark API to connect to Ignite and use this dynamically created Ignite table. Manually increment id and pass into DataFrames API. For instance, this Ignite API can be used for unique IDs generation.

    As for the unsupported Spark 2.4 version, I've opened a ticket for Ignite community. Hopefully, the ticket will be taken into 2.7.6 release scheduled for August.