Search code examples
apache-sparkignite

Apache Ignite Spark Integration not working with Schema Name


I'm using Apache Ignite Spark Connector (ignite-spark-2.7.5) to persist my DataFrame to Ignite table using the below code.

val ignite = Ignition.start(CONFIG); 
  catalog_opportunities_agg.write 
    .format(FORMAT_IGNITE) 
    .option(OPTION_CONFIG_FILE, CONFIG) 
    .option(OPTION_TABLE, "s1.club") 
    .option("user", "ignite") 
    .option("password", "ignite") 
    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "club_id") 
    .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") 
    .mode(SaveMode.Overwrite) 
    .save() 
Ignition.stop(false);

The code is working fine for public schema (without mentioning the schema name) but it starts to fail as soon as I add the schema name(s1) to it.

Error stack:

19/09/04 10:24:06 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 208) 
java.util.NoSuchElementException: None.get 
        at scala.None$.get(Option.scala:347) 
        at scala.None$.get(Option.scala:345) 
        at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
        at org.apache.spark.scheduler.Task.run(Task.scala:121) 
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
        at java.lang.Thread.run(Thread.java:748) 
19/09/04 10:24:06 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 202) 
java.util.NoSuchElementException: None.get 
        at scala.None$.get(Option.scala:347) 
        at scala.None$.get(Option.scala:345) 
        at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
        at org.apache.spark.scheduler.Task.run(Task.scala:121) 
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
        at java.lang.Thread.run(Thread.java:748) 
19/09/04 10:24:06 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 206)

Kindly suggest what I'm doing wrong.


Solution

  • I think it doesn't understand the schema syntax. Instead of:

    .option(OPTION_TABLE, "s1.club") 
    

    try:

    .option(OPTION_SCHEMA, "s1") 
    .option(OPTION_TABLE, "club") 
    

    Note that, as long as the table name is unique, you shouldn't need to specify the schema:

    If this is not specified, all schemata will be scanned for a table name which matches the given table name and the first matching table will be used. This option can be used when there are multiple tables in different schemata with the same table name to disambiguate the tables.