Can you please help me with this issue. Is it not possible to convert PCollection of strings into Pcollection of Row ?
Is it not possible to convert Pcollection of String Array into Pcollection of Beam Row ?
I tried String Data type for all the fields in beam schema but it is also giving me same error.
I am using Java 11, Maven 3.8.5 and Apache beam Java SDK 2.41.0
I tried same code with Java 1.8 and Beam 2.40.0 getting same error.
public class beamRowPractise {
public static void main(String[] args){
PipelineOptions opts = PipelineOptionsFactory.create();
opts.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(opts);
PCollection<String> pc1 = p.apply(TextIO.read().from("data/indata.csv"));
PCollection<Row> pc2 = pc1.apply(MapElements.via(new mapString())).setRowSchema(getSchema()) ;
System.out.println(pc2.getSchema().toString());
p.run();
}
public static class mapString extends SimpleFunction<String, Row> {
@Override
public Row apply(String record){
String arr[] = record.split(",");
Row.Builder row = Row.withSchema(getSchema()) ;
row.withFieldValue("name",arr[0]);
row.withFieldValue("id1",arr[1]);
row.withFieldValue("id2",arr[2]);
row.withFieldValue("id3",arr[3]);
row.withFieldValue("id4",arr[4]);
return row.build();
}
}
public static Schema getSchema() {
org.apache.beam.sdk.schemas.Schema.Builder typed_schema_builder = org.apache.beam.sdk.schemas.Schema.builder();
typed_schema_builder.addField("name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING);
typed_schema_builder.addField("id1", Schema.FieldType.INT64);
typed_schema_builder.addField("id2", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
typed_schema_builder.addField("id3", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
typed_schema_builder.addField("id4", org.apache.beam.sdk.schemas.Schema.FieldType.INT64);
org.apache.beam.sdk.schemas.Schema typed_beam_schema = typed_schema_builder.build();
org.apache.beam.sdk.schemas.Schema schema = typed_beam_schema;
return schema;
}
}
Error :
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:374)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.bhargav.beamFirst.beamRowPractise.main(beamRowPractise.java:25)
Caused by: java.lang.IllegalStateException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491)
at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:313)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$hZNCN9ub.encode(Unknown Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$hZNCN9ub.encode(Unknown Source)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
Process finished with exit code 1
UPD: You need to chain your builder calls since .withFieldValue()
returns Row.FieldValueBuilder
, like this:
public static class mapString extends SimpleFunction<String, Row> {
@Override
public Row apply(String record){
String[] arr = record.split(",");
return Row.withSchema(getSchema())
.withFieldValue("name", arr[0])
.withFieldValue("id1", Long.valueOf(arr[1]))
.withFieldValue("id2", Long.valueOf(arr[2]))
.withFieldValue("id3", Long.valueOf(arr[3]))
.withFieldValue("id4", Long.valueOf(arr[4]))
.build();
}
}
As a workaround, you may try to use row.addValue(...)
instead and add values in the order defined in your schema, like this:
row.addValue(arr[0]);
row.addValue(Long.valueOf(arr[1]));
row.addValue(Long.valueOf(arr[2]));
row.addValue(Long.valueOf(arr[3]));
row.addValue(Long.valueOf(arr[4]));