Search code examples
scalaapache-sparkdatabricksvoltdbazure-databricks

Getting `Mispartitioned tuple in single-partition insert statement` while trying to insert data into a partitioned table using `TABLE_NAME.insert`


I am creating a VoltDB table with the given insert statement

CREATE TABLE EMPLOYEE (
    ID VARCHAR(4) NOT NULL,
    CODE VARCHAR(4) NOT NULL,
    FIRST_NAME VARCHAR(30) NOT NULL,
    LAST_NAME VARCHAR(30) NOT NULL,
    PRIMARY KEY (ID, CODE)
);

And partitioning the table with

PARTITION TABLE EMPLOYEE ON COLUMN ID;

I have written one spark job to insert data into VoltDB, I am using below scala code to insert records into VoltDB, Code works well if we do not partition the table

import org.voltdb._;
import org.voltdb.client._;
import scala.collection.JavaConverters._

val voltClient:Client = ClientFactory.createClient();
voltClient.createConnection("IP:PORT");

val empDf = spark.read.format("csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("sep", ",")
          .load("/FileStore/tables/employee.csv")

// Code to convert scala seq to java varargs
def callProcedure(procName: String, parameters: Any*): ClientResponse =
    voltClient.callProcedure(procName, paramsToJavaObjects(parameters: _*): _*)

def paramsToJavaObjects(params: Any*) = params.map { param ⇒
    val value = param match {
      case None    ⇒ null
      case Some(v) ⇒ v
      case _       ⇒ param
    }
    value.asInstanceOf[AnyRef]
}

empDf.collect().foreach { row =>
  callProcedure("EMPLOYEE.insert", row.toSeq:_*);
}

But I get below error if I partition the table

Mispartitioned tuple in single-partition insert statement.
Constraint Type PARTITIONING, Table CatalogId EMPLOYEE
Relevant Tuples:
ID  CODE  FIRST_NAME  LAST_NAME 
--- ----- ----------- ----------
1   CD01  Naresh       "Joshi"
    at org.voltdb.client.ClientImpl.internalSyncCallProcedure(ClientImpl.java:485)
    at org.voltdb.client.ClientImpl.callProcedureWithClientTimeout(ClientImpl.java:324)
    at org.voltdb.client.ClientImpl.callProcedure(ClientImpl.java:260)
    at line4c569b049a9d4e51a3e8fda7cbb043de32.$read$$iw$$iw$$iw$$iw$$iw$$iw.callProcedure(command-3986740264398828:9)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-3986740264399793:8)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-3986740264399793:7)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

I found found a link (https://forum.voltdb.com/forum/voltdb-discussions/building-voltdb-applications/1182-mispartitioned-tuple-in-single-partition-insert-statement) regarding the problem and tried to partition the procedure using below query

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID;

AND

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID [PARAMETER 0];

But I am getting [Ad Hoc DDL Input]: VoltDB DDL Error: "Partition references an undefined procedure "EMPLOYEE.insert"" error while executing these statement.

However, I am able to insert the data by using @AdHoc stored procedure but I am not able figure out the problem or the solution for the above scenario where I am using EMPLOYEE.insert stored procedure to insert data into a partitioned table.


Solution

  • The procedure "EMPLOYEE.insert" is what is referred to as a "default" procedure, which is automatically generated by VoltDB when you create the table EMPLOYEE. It is already automatically partitioned based on the partitioning of the table, therefore you cannot call "PARTITION PROCEDURE EMPLOYEE.insert ..." to override this.

    I think what is happening is that the procedure is partitioned by the ID column which in the EMPLOYEE table is a VARCHAR. The input parameter therefore should be a String. However, I think your code is somehow reading the CSV file and passing in the first column as an int value.

    The java client callProcedure(String procedureName, Object... params) method accepts varargs for the parameters. This can be any Object[]. There is a check along the way somewhere on the server where the # of arguments must match the # expected by the procedure, or else the procedure call is returned as rejected, and it would never have been executed. However, I think in your case the # of arguments is ok, so it then tries to execute the procedure. It hashes the 1st parameter value corresponding to ID and then determines which partition this should go to. The invocation is routed to that partition for execution. When it executes, it tries to insert the values, but there is another check that the partition key value is correct for this partition, and this is failing.

    I think if the value is passed in as an int, it is hashed to the wrong partition. Then in that partition it tries to insert the value into the column, which is a VARCHAR so it probably implicitly converts the int to a String, but it's not in the correct partition, so the insert fails with this error "Mispartitioned tuple in single-partition insert statement." which is the same error you would get if you wrote a java stored procedure and configured the wrong column as the partition key.

    Disclosure: I work at VoltDB.