Search code examples
datasetkey-valuegoogle-cloud-data-fusioncdap

Write a key in KeyValueTable fails


I've developed and successfully deployed a custom batch source plugin for the CDAP platform on Google Data Fusion. The plugin sometimes works in preview mode but alwais fails when I deploy the pipeline with the following error:

org.apache.tephra.TransactionFailureException: Unable to persist changes of transaction-aware 'RecordGenerator' for transaction 1574271280330000000
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:260) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.finish(AbstractTransactionContext.java:115) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.finishExecute(Transactions.java:230) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.execute(Transactions.java:211) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:546) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:534) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.BasicSparkClientContext.execute(BasicSparkClientContext.java:333) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.etl.common.submit.SubmitterPlugin.prepareRun(SubmitterPlugin.java:69) ~[na:na]
    at io.cdap.cdap.etl.batch.PipelinePhasePreparer.prepare(PipelinePhasePreparer.java:111) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.SparkPreparer.prepare(SparkPreparer.java:104) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.ETLSpark.initialize(ETLSpark.java:112) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:131) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:33) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:167) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:162) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.lambda$initializeProgram$1(AbstractContext.java:640) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:600) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.initializeProgram(AbstractContext.java:637) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.initialize(SparkRuntimeService.java:433) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.startUp(SparkRuntimeService.java:208) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:47) ~[com.google.guava.guava-13.0.1.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$5$1.run(SparkRuntimeService.java:404) [io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
    Suppressed: org.apache.tephra.TransactionFailureException: Unable to roll back changes in transaction-aware 'RecordGenerator' for transaction 1574271280330000000
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:143) ~[na:na]
        ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.undo(LevelDBTableCore.java:184)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undoPersisted(LevelDBTable.java:113)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undo(LevelDBTable.java:108)
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.rollbackTx(BufferingTable.java:368)
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.rollbackTx(AbstractDataset.java:101)
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:141)
    ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.persist(LevelDBTableCore.java:164) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:100) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:92) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.commitTx(BufferingTable.java:351) ~[na:na]
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.commitTx(AbstractDataset.java:91) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:255) ~[na:na]
    ... 22 common frames omitted

The error is misleading in my opinion because the error derives from the following piece of code inside the plugin:

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    pipelineConfigurer.createDataset("RecordGenerator", KeyValueTable.class);
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}

@Override
@TransactionPolicy(TransactionControl.IMPLICIT)
public void prepareRun(BatchSourceContext context) throws IOException {
    KeyValueTable d = context.getDataset("RecordGenerator");
    d.write("numberOfRecords", Long.toString(config.numberOfRecords));
    context.setInput(Input.ofDataset("RecordGenerator"));
}

In particular the guilty line is d.write("numberOfRecords", Long.toString(config.numberOfRecords));. If I remove this line the plugin works but obviously doesn't run the transform part of the plugin.

I am without of ideas, the behaviour seems erratic in preview mode and the documentation (if any) is really sparse to say the best. What can I do to make it work?


Solution

  • The KeyValueTable is not supported in Data Fusion. It worked in preview because preview runs in local mode. If you want to save something in the prepareRun() method, you will need to use some other storage. An easy alternative is to use the file on gcs to store the information. Here is a piece of code snippets that you can use to write the information to a file: https://github.com/data-integrations/kafka-plugins/blob/release/2.2/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java#L160-L167