Search code examples
javaavrogoogle-cloud-dataflowkryoapache-beam

Override AvroIO default Coder in Dataflow


I'm trying to use a custom Coder for processing data in Dataflow. What I did is the following:

  • Exported data from BigQuery to avro files
  • Auto-generated a class from the schema in those files using avro-tools-1.7.7.jar
  • Wrote a custom Coder for the class using Kryo
  • Annotated the class with @DefaultCoder(MyCustomCoder.class)
  • Registered my coder using p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
  • Read data from the avro files using PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));

The thing is, if I have a bug in my Coder, my job only fails during a shuffling step. It doesn't look like Dataflow is using my custom Coder for loading the data from the avro files. Is that really the case? And if so, is there a way to override the Coder used for loading the data?


Solution

  • AvroIO currently always uses the built-in AvroCoder for reading from input files. You can change the coder later in the pipeline like you describe. If your data is not actually encoded in a way that AvroIO can read, you should use a different source instead, for example, a new subclass of FileBasedSource.