Search code examples
javajsonapache-beamparquet

How to convert JSON to Parquet in Apache Beam using Java


I am trying to convert Json Data

{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}

and I need this to be converted in Parquet

then I wrote some code in Apache Beam

package org.apache.beam.examples;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.kitesdk.data.spi.JsonUtil;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;

public class Main {

    public static void main(String[] args) throws IOException {

        Pipeline pipeLine = Pipeline.create();
        PCollection<String> lines = pipeLine.apply("ReadMyFile", TextIO.read().from("path-to-file"));

        File initialFile = new File("path-to-file");
        InputStream targetStream = Files.newInputStream(initialFile.toPath());
        Schema jsonSchema = JsonUtil.inferSchema(targetStream, "RecordName", 20);
        System.out.println(jsonSchema.getDoc());
        PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                Gson gson = new GsonBuilder().create();
                JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
//                out.output(parsedMap);
//                System.out.println(Arrays.toString(parsedMap.toString().getBytes(StandardCharsets.UTF_8)));
                JsonAvroConverter avroConverter = new JsonAvroConverter();
//                GenericRecord record =  avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);

//                context.output(record);
            }
        }));
        pipeLine.run();
        //
//        pgr.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to("path/to/save"));
        
    }
}

I am able to get line by line json but unable to convert it to Parquet the above code throws error if you try to convert the Json to Parquet using

GenericRecord record =  avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);

error due to this line

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    ... 25 more

Solution

  • I created a new class and added as parameter construct and created it in @Setup.

    Schema jsonSchema = new Schema.Parser().parse(schemaString);
    pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
                    .apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
                    .setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
    
    
        private static final Logger logger = LogManager.getLogger(JsonToGeneralRecord.class);
    
        private final String schemaString;
        private Schema jsonSchema;
    
    
        // constructor
        JsonToGeneralRecord(Schema schema) {
            schemaString = schema.toString();
        }
    
        @Setup
        public void setup() {
            jsonSchema = new Schema.Parser().parse(schemaString);
        }
    
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
    
            Gson gson = new GsonBuilder().create();
            JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
            logger.info("successful: " + parsedMap.toString());
    
            JsonAvroConverter avroConverter = new JsonAvroConverter();
            try {
                GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
                c.output(record);
            } catch (Exception e) {
                logger.error("error:  " + e.getMessage() + parsedMap);
                e.printStackTrace();
            }
    
        }
    }```