Search code examples
apache-sparkspark-streamingaccumulo

Spark streaming + Accumulo - Serialize BatchWriterImpl


I'm looking for Spark Streaming + Accumulo connectors and full usage example.

Currently I'm trying to write Spark Streaming results to Accumulo table but I'm getting NotSerializableException for BatchWriter. Can someone point me to examples of how to serialize BatchWriter? Code below is based on Accumulo documentation.

Current code:

val accumuloInstanceName = "accumulo"
val zooKeepers = "localhost:2181"
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers)
val accumuloUser = programOptions.accumuloUser()
val accumuloPassword = programOptions.accumuloPassword()
val passwordToken = new PasswordToken(accumuloPassword)
val connector = instance.getConnector(accumuloUser, passwordToken)

val accumuloBatchWriterConfig = new BatchWriterConfig
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory)
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
fullMergeResultFlatten.foreachRDD(recordRDD =>
  recordRDD.foreach(record => {
    val mutation = new Mutation(Longs.toByteArray(record.timestamp))
    mutation.put("value", "", new Value(Longs.toByteArray(record.value)))
    mutation.put("length", "", new Value(Longs.toByteArray(record.length)))
    accumuloBatchWriter.addMutation(mutation)
  })
)

During runtime errors occur:

17/05/05 16:55:25 ERROR util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

I suppose this is very common case but I couldn't find any simple spark streaming + accumulo example.


Solution

  • As elserj pointed out, serializing the connection object is typically not the correct pattern. The pattern I have seen is to initiate a connection from the Spark worker nodes directly using RDD.foreachPartition(). This is nice because it allows you to create a connection per batch of work (as opposed to creating a new connection for each individual record which is almost never efficient).

    Example:

    fullMergeResultFlatten.foreachRDD(recordRDD => {
      recordRDD.foreachPartition(partitionRecords => {
        // this connection logic is executed in the Spark workers
        val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
        partitionRecords.foreach( // save operation )
        accumuloBatchWriter.close()
      })
    })