I'm trying to read avro files with Apache Beam and use Beam SQL to transform the data.
I'm still new in Beam and Java. Here's my simple code:
public class BeamSQLReadAvro {
@SuppressWarnings("serial")
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
/* Schema definition */
Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));
/* Create record/row */
PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));
/* SQL Transform */
records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))
/* Print output */
.apply("Output",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}
)
);
p.run().waitUntilFinish();
}
}
it gives me error
Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema
I don't understand, I have defined variable called schema. Any pointers here?
Actually, there are two types of schemas in your pipeline - Avro and Beam schemas. Avro schema is used to parse your Avro input records, but for SQL transform you are supposed to use rows with Beam schema. To do this, AvroIO
provides an option withBeamSchemas(boolean)
, which should be set to true
in your case, like:
AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")