Search code examples
javagoogle-cloud-dataflowapache-beam

Apache Beam Row Coder issue org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException


Can you please help me with this issue. Is it not possible to convert PCollection of strings into Pcollection of Row ?

Is it not possible to convert Pcollection of String Array into Pcollection of Beam Row ?

I tried String Data type for all the fields in beam schema but it is also giving me same error.

I am using Java 11, Maven 3.8.5 and Apache beam Java SDK 2.41.0

I tried same code with Java 1.8 and Beam 2.40.0 getting same error.

public class beamRowPractise {

    public static void main(String[] args){

        PipelineOptions opts = PipelineOptionsFactory.create();
        opts.setRunner(DirectRunner.class);
        Pipeline p = Pipeline.create(opts);
        PCollection<String> pc1 = p.apply(TextIO.read().from("data/indata.csv"));
        PCollection<Row> pc2 = pc1.apply(MapElements.via(new mapString())).setRowSchema(getSchema()) ;
        System.out.println(pc2.getSchema().toString());
        p.run();
        }
    public static class mapString extends SimpleFunction<String, Row> {
        @Override

        public  Row apply(String record){
            String arr[] = record.split(",");

            Row.Builder row = Row.withSchema(getSchema()) ;

            row.withFieldValue("name",arr[0]);
            row.withFieldValue("id1",arr[1]);
            row.withFieldValue("id2",arr[2]);
            row.withFieldValue("id3",arr[3]);
            row.withFieldValue("id4",arr[4]);

            return  row.build();

        }
    }

    public  static  Schema getSchema() {
        org.apache.beam.sdk.schemas.Schema.Builder typed_schema_builder = org.apache.beam.sdk.schemas.Schema.builder();
        typed_schema_builder.addField("name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING);
        typed_schema_builder.addField("id1", Schema.FieldType.INT64);
        typed_schema_builder.addField("id2", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
        typed_schema_builder.addField("id3", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
        typed_schema_builder.addField("id4", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
        org.apache.beam.sdk.schemas.Schema typed_beam_schema = typed_schema_builder.build();
        org.apache.beam.sdk.schemas.Schema schema = typed_beam_schema;
        return  schema;
    }
}

Error :

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:374)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at com.bhargav.beamFirst.beamRowPractise.main(beamRowPractise.java:25)
Caused by: java.lang.IllegalStateException
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491)
    at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:313)
    at org.apache.beam.sdk.coders.Coder$ByteBuddy$hZNCN9ub.encode(Unknown Source)
    at org.apache.beam.sdk.coders.Coder$ByteBuddy$hZNCN9ub.encode(Unknown Source)
    at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)

Process finished with exit code 1

Solution

  • UPD: You need to chain your builder calls since .withFieldValue() returns Row.FieldValueBuilder, like this:

      public static class mapString extends SimpleFunction<String, Row> {
        @Override
        public Row apply(String record){
          String[] arr = record.split(",");
    
          return Row.withSchema(getSchema())
            .withFieldValue("name", arr[0])
            .withFieldValue("id1", Long.valueOf(arr[1]))
            .withFieldValue("id2", Long.valueOf(arr[2]))
            .withFieldValue("id3", Long.valueOf(arr[3]))
            .withFieldValue("id4", Long.valueOf(arr[4]))
            .build();
        }
      }
    

    As a workaround, you may try to use row.addValue(...) instead and add values in the order defined in your schema, like this:

    row.addValue(arr[0]);
    row.addValue(Long.valueOf(arr[1]));
    row.addValue(Long.valueOf(arr[2]));
    row.addValue(Long.valueOf(arr[3]));
    row.addValue(Long.valueOf(arr[4]));