Search code examples
bigdataapache-kafka-streams

How to effectively chain groupby queries from flat api data in Kafka Streams?


I have some random data coming from an API into a Kafka topic that looks like this:

{"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"}
{"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"}
{"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"}
{"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"}

I am able to create KStream objects and observe them, like this:

KStream<byte[], UsedCars> usedCarsInputStream = 
            builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));

            //k, v => year, countof cars in year
            KTable<String,Long> yearCount = usedCarsInputStream
                .filter((k,v)->v.getYear() > 2010)
                .selectKey((k,v) -> v.getVin())
                .groupBy((key, value) -> Integer.toString(value.getYear()))
                .count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah")); 

This of course gives us a count of the records grouped by each year greater than 2010. However, what I would like to do in the next step, but have been unable to accomplish, is to simply take each of those years, as in a foreach, and count the number of cars in each color per year. I attempted writing a foreach on yearCount.toStream() to further process the data, but got no results.

I am looking for output that might look like this:

{
  "2011": [
    {
      "blue": "99",
      "green": "243,",
      "red": "33"
    }
  ],
  "2012": [
    {
      "blue": "74,",
      "green": "432,",
      "red": "2"
    }
  ]
}

Solution

  • I believe I may have answered my own question. I would welcome any others to comment on my own solution.

    What I did not realize is that you can do GroupBy an object that is essentially a compound object. In this case, I needed the equivalent of this following SQL statement

    SELECT   year, color, count(*) FROM use_car_colors AS years 
    GROUP BY year, color
    

    In Kafka Streams, you can accomplish this by creating an object -- in this situation, I created a POJO class called 'YearColor' with members year and color -- and then select that as a key in Kafka Streams:

    usedCarsInputStream
                .selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
                .groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
                .count()
                .toStream()
                .peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor() 
                + " count: " + ct));
    

    You of course have to implement the Serializer and Deserializer for this object (and I did with YearColorSerdes()). My output when running the Kafka Streams application gives me updates on the modified counts, a la:

    year: 2012 color: Maroon count: 2
    year: 2013 color: Khaki count: 1
    year: 2012 color: Crimson count: 5
    year: 2011 color: Pink count: 4
    year: 2011 color: Green count: 2
    

    which is what I was looking for.