Search code examples
javahazelcast-jet

Hazelcast Jet vs Java 8 streams


I'm trying sort List<Map<String,Object>> object based on max date using Hazelcast Jet.

Here is my java 8 code that works:

public static List<Map<String, Object>> extractDate1(List<Map<String, Object>> data) {
    return data.stream().map(value -> new Object() {
        Map<String, Object> theMap = value;
        LocalDate date = extractDate(value);
    }).sorted(Comparator.comparing(obj -> obj.date)).map(obj -> obj.theMap).collect(Collectors.toList());
}

public static LocalDate extractDate(Map<String, Object> value) {
    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("dd-MM-yyyy");
    DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d-MM-yyyy");
    return LocalDate.parse(LocalDate.parse(value.get("effectiveDate").toString(), formatter2).format(formatter1),
            formatter);
}

The above java 8 code sorts the map objects from low to high:

Below is the Jet code that I'm trying to extract also giving proper output. But I just want to make use of hazelcast jet aggregate/rolling functions

// fetching jsonb type data from db
BatchStage<Object> jobJson = dbValue
        // this model holds the string json value
        // converting json data to Map object
        .map(model -> JsonUtil.mapFrom(model.getJosnValue())
    .filter(map -> map.size() != 0)
    .map(map -> {
            // each json/map object will be having an array and again an array will I have multiple json objects in the
            // I'm filtering json objects based on max date 
      List<Map<String, Object>> extractedDateValue;
            if (map.containsKey("records")) {
         //Here I'm calling external function (above java 8 code)
                 extractedDateValue = extractMapBasedOnMax(
                        (List<Map<String, Object>>) map.get("records"));
            }
                
            return extractedDateValue.get(extractedDateValue.size() - 1);
        });

JSON data example:

{
    "id": "01",
    "records": [{
        "location": "xyz1",
        "effectiveDate": "02-03-2021"
    }, {
        "location": "xyz2",
        "effectiveDate": "02-04-2021"
    }]
}

Expeceted Output:

{
  "location": "xyz2",
   "effectiveDate": "02-04-2021"
}

Is it possible to achieve this through Hazelcast Jet rolling aggregations? Or any suggestions would be helpful.. Thanks


Solution

  • Consider flatMapping the pipeline and finding the maximum using topN. flatMap would convert each JSON structure to series of [id, location, effectiveDate] records. See the documentation of flatMap for code sample.

    It's not clear whether you want to find max element in the whole collection or max element per id. Adding a groupingKey would find maximum per id.

    The pipeline shape in a "metacode":

    source // stream of JSON structures
    .flatMap // stream [id, location, effectiveDate]
    .groupingKey // for maximum per id, remove for global max
    .aggregate(AggregateOpperations.topN) // finds max 
    .sink;