Search code examples
javascalabigdataaccumulospark-notebook

odd error when populating accumulo 1.6 mutation object via spark-notebook


using spark-notebook to update an accumulo table. employing the method specified in both the accumulo documentation and the accumulo example code. Below is verbatim what I put into notebook, and the responses:

val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);

clientRqrdTble: org.apache.accumulo.core.cli.ClientOnRequiredTable = org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig: org.apache.accumulo.core.client.BatchWriterConfig = [maxMemory=52428800, maxLatency=120000, maxWriteThreads=3, timeout=9223372036854775807] batchWriter: org.apache.accumulo.core.client.BatchWriter = org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736

val rowIdS = rddX2_first._1.split(" ")(0)

rowIdS: String = row_0736460000

val mutation = new Mutation(new Text(rowIdS))

mutation: org.apache.accumulo.core.data.Mutation = org.apache.accumulo.core.data.Mutation@0

mutation.put(
  new Text("foo"), 
  new Text("1"), 
  new ColumnVisibility("exampleVis"), 
  new Value(new String("CHEWBACCA!").getBytes) )

java.lang.IllegalStateException: Can not add to mutation after serializing it at org.apache.accumulo.core.data.Mutation.put(Mutation.java:168) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:163) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)

I dug into the code and see that the culprit is an if-catch that's checking to see if UnsynchronizedBuffer.Writer buffer is null. the line numbers won't line up because this is a slightly different version than what's in the 1.6 accumulo-core jar - I've looked at both and the difference isn't one that makes a difference in this case. as far as I can tell, the object is getting created prior to execution of that method and isn't getting dumped.

so either I'm missing something in the code, or something else is up. do any of you know what might be causing this behavior?

UPDATE ONE

I've executed the following code using the scala console and via straight java 1.8. It fails in scala, but not in Java. I'm thinking this is an Accumulo issue at this point. As such I'm going to open a bug ticket and dig deeper into the source. If I come up with a resolution I'll post here.

below is the code in Java form. There's some extra stuff in there because I wanted to make sure I could connect to the table I created using the accumulo batch writer example:

import java.util.Map.Entry;

import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;

public class App {

    public static void main( String[] args ) throws 
                                            AccumuloException, 
                                            AccumuloSecurityException, 
                                            TableNotFoundException {
        // connect to accumulo using a scanner
        // print first ten rows of a given table
        String instanceNameS = "accumulo";
        String zooServersS = "localhost:2181";
        Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
        Connector connector = 
                instance.getConnector( "root", new PasswordToken("password"));

        Authorizations auths = new Authorizations("exampleVis");
        Scanner scanner = connector.createScanner("batchtestY", auths);

        scanner.setRange(new Range("row_0000000001", "row_0000000010"));

        for(Entry<Key, Value> entry : scanner) {
          System.out.println(entry.getKey() + " is " + entry.getValue());
        }


        // stage up connection info objects for serialization
        ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        BatchWriter batchWriter = 
                connector.createBatchWriter("batchtestY", bwConfig);

        // create mutation object
        Mutation mutation = new Mutation(new Text("row_0000000001"));

        // populate mutation object
        // -->THIS IS WHAT'S FAILING IN SCALA<--
        mutation.put(
                  new Text("foo"), 
                  new Text("1"), 
                  new ColumnVisibility("exampleVis"), 
                  new Value(new String("CHEWBACCA!").getBytes()) );                                           
    }
}

UPDATE TWO

an Accumulo bug ticket has been created for this issue. their target is to have this fixed in v1.7.0. until then, the solution i provided below is a functional work-around.


Solution

  • so it seems as though the code that works perfectly well with java doesn't play nice with Scala. the solution (not necessarily a GOOD solution, but a working one) is to create a java method in a self-contained jar that creates the mutation object and returns it. this way you can add the jar to spark's classpath and call the method ass needed. tested using spark notebook and was successful in updating an existing accumulo table. i'm still going to submit a ticket to the accumulo peeps as this kind of work-around shouldn't be considered 'best practice'.