Search code examples
javaapache-sparkserializationhiveaerospike

Apache Spark Dataset. foreach with Aerospike client


I want to retrieve rows from Apache Hive via Apache Spark and put each row to Aerospike cache.

Here is a simple case.

var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike;  // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    aerospikeClient.add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

I get an error:

Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient

Obviously I can't make AerospikeReactorClient serializable. I tried to add dataset.collectAsList() and that did work. But as far as understood, this method loads all the content into one node. There might an enormous amount of data. So, it's not the option.

What are the best practices to deal with such problems?


Solution

  • I managed to overcome this issue by creating AerospikeClient manually inside foreach lambda.

    var dataset = session.sql("select * from employee");
    dataset.foreach(row -> {
        var key = new Key("namespace", "set", randomUUID().toString());
        newAerospikeClient(aerospikeProperties).add(
            key,
            new Bin(
                "json-repr",
                row.json()
            )
        );
    });
    

    Now I only have to declare AerospikeProperties as Serializable.