Search code examples
javaapache-kafkagoogle-bigqueryapache-beamavro

Kafka Avro To BigQuery using Apache Beam in Java


Here is the scenario:

Kafka To BigQuery using Apache Beam. This is an alternative to BigQuerySinkConnector [WePay] using Kafka Connect.

I have been able to read Avro message from Kafka Topic. I am also able to print the contents to console accurately. I am looking for help with writing these KafkaRecords to BigQuery table.

PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        //Customer is an auto generated class from avro schema using eclipse avro maven plugin
        // Read from Kafka Topic and get KafkaRecords
        @SuppressWarnings("unchecked")
        PTransform<PBegin, PCollection<KafkaRecord<String, Customer>>> input = KafkaIO.<String, Customer>read()
                    .withBootstrapServers("http://server1:9092")
                       .withTopic("test-avro")
                       .withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
                       .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
                       .withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", (Object)"http://server2:8181"))
                       .withKeyDeserializer(StringDeserializer.class)
                       .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(Customer.class));
        
        
        // Print kafka records to console log
        
        pipeline.apply(input)
                .apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,Customer>, KafkaRecord<String,Customer>>() { 
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        KafkaRecord<String, Customer> record = (KafkaRecord<String, Customer>) c.element();
                        KV<String, Customer> log = record.getKV();
                        System.out.println("Key Obtained: " + log.getKey());
                        System.out.println("Value Obtained: " + log.getValue().toString());
                        c.output(record);
                        
                    }
                }));
                
        // Write each record to BigQuery Table 
        // Table is already available in BigQuery so create disposition would be CREATE_NEVER
        // Records to be appended to table - so write disposition would be WRITE_APPEND
        // All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
        
        // Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
        // Set table specification for BigQuery
          String bqTable = "my-project:my-dataset:my-table";

The current examples available - shows how to manually set a schema and assign field by field the values. I am looking for an automated way to infer the Customer Avro object and assign it to the columns directly without such manual field by field assignment.

Is this possible?


Solution

  • After much trial and error I was able to make the following work.

    I would welcome review comments to share concerns / propose better solutions.

            SchemaRegistryClient registryClient = new CachedSchemaRegistryClient(http://server2:8181,10);
            SchemaMetadata latestSchemaMetadata;
            Schema avroSchema = null; 
            try {
                // getLatestSchemaMetadata takes the subject name which is topic-value format where "-value" is suffixed to topic
                // so if topic is "test-avro" then subject is "test-avro-value"
                latestSchemaMetadata = registryClient.getLatestSchemaMetadata("test-avro-value");
                avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                System.out.println("IO Exception while obtaining registry data");
                e.printStackTrace();
            } catch (RestClientException e) {
                // TODO Auto-generated catch block
                System.out.println("Client Exception while obtaining registry data");
                e.printStackTrace();
                
            }
            
            // Printing avro schema obtained
            System.out.println("---------------- Avro schema ----------- " + avroSchema.toString());
            
            PipelineOptions options = PipelineOptionsFactory.create();
            Pipeline pipeline = Pipeline.create(options);
            
            
            // Read from Kafka Topic and get KafkaRecords
            // Create KafkaIO.Read with Avro schema deserializer
            @SuppressWarnings("unchecked")
            KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
                .withBootstrapServers("http://server1:9092")
                .withTopic(KafkaConfig.getInputTopic())
                .withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", "http://server2:8181"))
                .withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
                .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));
            
            
            // Set Beam Schema
            
            org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(avroSchema); 
            
            // Print kafka records to console log
                            
            // Write each record to BigQuery Table 
            // Table is already available in BigQuery so create disposition would be CREATE_NEVER
            // Records to be appended to table - so write disposition would be WRITE_APPEND
            // All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
            
            // Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
            // Set table specification for BigQuery
              String bqTable = "my-project:my-dataset:my-table";
            
            p.apply(read)
            .apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,GenericRecord>, KV<String, GenericRecord>>() { 
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @ProcessElement
                public void processElement(ProcessContext c) {
                    KafkaRecord<String, GenericRecord> record = (KafkaRecord<String, GenericRecord>) c.element();
                    KV<String, GenericRecord> log = record.getKV();
                    System.out.println("Key Obtained: " + log.getKey());
                    System.out.println("Value Obtained: " + log.getValue().toString());
                    c.output(log);
                    
                }
            }))
            .apply(Values.<GenericRecord>create()).setSchema(beamSchema, TypeDescriptor.of(GenericRecord.class) ,AvroUtils.getToRowFunction(GenericRecord.class, avroSchema), AvroUtils.getFromRowFunction(GenericRecord.class))
            .apply(BigQueryIO.<GenericRecord>write()
                    .to(tableSpec)
                    .useBeamSchema()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND));
            
            p.run().waitUntilFinish();
    
    

    The above works with CREATE_IF_NEEDED also.