I have some random data coming from an API into a Kafka topic that looks like this:
{"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"}
{"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"}
{"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"}
{"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"}
I am able to create KStream
objects and observe them, like this:
KStream<byte[], UsedCars> usedCarsInputStream =
builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));
//k, v => year, countof cars in year
KTable<String,Long> yearCount = usedCarsInputStream
.filter((k,v)->v.getYear() > 2010)
.selectKey((k,v) -> v.getVin())
.groupBy((key, value) -> Integer.toString(value.getYear()))
.count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah"));
This of course gives us a count of the records grouped by each year greater than 2010. However, what I would like to do in the next step, but have been unable to accomplish, is to simply take each of those years, as in a foreach
, and count the number of cars in each color per year. I attempted writing a foreach
on yearCount.toStream()
to further process the data, but got no results.
I am looking for output that might look like this:
{
"2011": [
{
"blue": "99",
"green": "243,",
"red": "33"
}
],
"2012": [
{
"blue": "74,",
"green": "432,",
"red": "2"
}
]
}
I believe I may have answered my own question. I would welcome any others to comment on my own solution.
What I did not realize is that you can do GroupBy an object that is essentially a compound object. In this case, I needed the equivalent of this following SQL statement
SELECT year, color, count(*) FROM use_car_colors AS years
GROUP BY year, color
In Kafka Streams, you can accomplish this by creating an object -- in this situation, I created a POJO class called 'YearColor' with members year and color -- and then select that as a key in Kafka Streams:
usedCarsInputStream
.selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
.groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
.count()
.toStream()
.peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor()
+ " count: " + ct));
You of course have to implement the Serializer and Deserializer for this object (and I did with YearColorSerdes()). My output when running the Kafka Streams application gives me updates on the modified counts, a la:
year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2
which is what I was looking for.