Search code examples
javaapache-beambeam-sql

Query Avro Schema using Beam SQL


I'm trying to read avro files with Apache Beam and use Beam SQL to transform the data.

I'm still new in Beam and Java. Here's my simple code:

public class BeamSQLReadAvro {
    @SuppressWarnings("serial")
    public static void main(String[] args) throws IOException {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        Pipeline p = Pipeline.create(options);

        /* Schema definition */
        Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));

        /* Create record/row */
        PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));

        /* SQL Transform */
        records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))

        /* Print output */
               .apply("Output",
                      MapElements.via(
                        new SimpleFunction<Row, Row>() {
                          @Override
                          public Row apply(Row input) {
                            System.out.println("PCOLLECTION: " + input.getValues());
                            return input;
                          }
                        }
                      )
               );
        p.run().waitUntilFinish();
    }
}

it gives me error

Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema

I don't understand, I have defined variable called schema. Any pointers here?


Solution

  • Actually, there are two types of schemas in your pipeline - Avro and Beam schemas. Avro schema is used to parse your Avro input records, but for SQL transform you are supposed to use rows with Beam schema. To do this, AvroIO provides an option withBeamSchemas(boolean), which should be set to true in your case, like:

    AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")