Search code examples
javacassandranioapache-beamdatastax-java-driver

Exception writing ByteBuffer field into Cassandra


We have an apache beam dataflow job that reads the data from Big Query converts them into POJO before writing into Cassandra using Datastax driver. I recently added a new blob column to the table and added a ByteBuffer field to the POJO.

How I am creating the ByteBuffer

String str = objectMapper.writeValueAsString(installSkuAttributes);
byte[] bytes = str.getBytes( StandardCharsets.UTF_8 );
pojo.setInstallAttributes(ByteBuffer.wrap(bytes));

here is the pipeline snippet public void executePipeline() throws Exception {

Pipeline pipeline = 
Pipeline.create(jobMetaDataBean.getDataflowPipelineOptions());

.... writeDataToCassandra(installSkuData);

pipeline.run();

After making the necessary changes in the DAO writer I am getting the following exception When I ran the job. I am using Datastax driver

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.executePipeline(InstallSkusFullFeed.java:216)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.main(InstallSkusFullFeed.java:221)
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:88)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
Caused by: java.io.NotSerializableException: java.nio.HeapByteBuffer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:166)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:52)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • The real problem is here:

    java.io.NotSerializableException: java.nio.HeapByteBuffer

    It's caused by the fact that ByteBuffer isn't serializable, so it couldn't be used for distributed work. You can avoid this by using byte[] directly as your attribute, or implement a wrapper around ByteBuffer that will be serializable.