Search code examples
javahazelcasthazelcast-jet

How to create a new DistributedFunction


This new paradigm of programming is very new to me. I would like to replace the anonymous function in .map() by a defined DistributedFunction in a given class. But I'm not sure how to create the new function.

I have the following pipeline:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(e -> {
    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(), 
    KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {
       kafkaMessage.setData(new String(encodedData, "utf-8"));
    } catch (Exception e1) {
       // TODO Auto-generated catch block
       e1.printStackTrace();
    }

    return kafkaMessage;             
  })
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

Based on some Jet examples, I ended up with the following:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(KafkaHelper::decodeKafkaMessage)
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

KafkaHelper class:

public final class KafkaHelper implements Serializable {

    private static final long serialVersionUID = -3556269069192202060L;

    public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {

        Gson gson = new Gson();

        KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);

        byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

        try {
            kafkaMessage.setData(new String(encodedData, "utf-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }

         return kafkaMessage;            
    }   

}

Does this approach follow the specification/requirement to pass a DistributedFunction to .map()? If yes, why? If not, what changes should I do to it?


Solution

  • Yes, in both your examples you're creating and passing an instance of DistributedFunction to map(). Java 8 has a rule by which the lambda function in your first example and the method reference in your second example are used to create a synthetic subtype of DistributedFunction that implements its Single Abstract Method ("SAM") with the code you supplied.

    Your KafkaHelper doesn't have to be Serializable because you never instantiate it. you can also put the static method decodeKafkaMessage in any other class because it has no dependency to the class instance.