This question is the follow-up of my previous one where I asked about serialization of Kafka stream using custom Avro
Serdes. Now I have a different issue when trying to configure a JSON
Serde. I have this kafka stream topology where I use a groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.
@Service
@Slf4j
@EnableBinding(PosListenerAvroJsonBinding.class)
public class NotificationAvroJsonProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-avro-channel")
@SendTo("notification-output-json-channel")
public KStream<String, Notification> process(KStream<String, PosInvoiceAvro> input) {
/* with reduce transformation and serialization with KTable */
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
// ***********************************************
// THIS DOES NOT WORK WITH JSON, only works with AVRO.
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
// ***********************************************
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
return notificationJsonKStream;
}
}
I defined the Custom serializer based on this web page. I think I have to use the com.fasterxml.jackson.databind.JsonNode
, but it did not work. I also tested the other options that are commented and they also did not work.
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.github.felipegutierrez.explore.spring.model.Notification")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JavaType")
, new AbstractMap.SimpleEntry<>(TYPE_PROPERTY, TYPE_PROPERTY_DEFAULT)
// , new AbstractMap.SimpleEntry<>("json.value.type", "org.springframework.kafka.support.serializer.JsonSerializer")
)
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<Notification>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
On the web page also say to define the type.property=javaType, the JSON schema could specify "javaType":"org.acme.MyRecord" at the top level
of the java class.
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSchemaInject(strings = {@JsonSchemaString(path = "javaType", value = "com.github.felipegutierrez.explore.spring.model.Notification")})
public class Notification {
@JsonProperty("InvoiceNumber")
private String InvoiceNumber;
@JsonProperty("CustomerCardNo")
private String CustomerCardNo;
@JsonProperty("TotalAmount")
private Double TotalAmount;
@JsonProperty("EarnedLoyaltyPoints")
private Double EarnedLoyaltyPoints;
@JsonProperty("TotalLoyaltyPoints")
private Double TotalLoyaltyPoints = 0.0;
}
When I was using the default JSON serializer for Spring + Kafka I just set spring.json.add.type.headers: false
on the application.yaml
and it worked. But I cannot find such property on the Confluent serializer.
Finally, the error is below. I think the way to go is to use a correct parameter at my notificationSerde.configure(serdeConfig, false);
because when I change the json serializer there I see that the application tries to cast to different classes. But I don't know which configuration I have to place there.
Caused by: java.lang.ClassCastException: class com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class com.github.felipegutierrez.explore.spring.model.Notification (com.fasterxml.jackson.databind.node.ObjectNode and com.github.felipegutierrez.explore.spring.model.Notification are in unnamed module of loader 'app')
I fixed using this configuration of the Serde.
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(FAIL_INVALID_SCHEMA, "true")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, Notification.class.getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
}
and added the Materialized.with(CustomSerdes.String(), CustomSerdes.Notification())
to the reducer as well as it is suggested here.
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
Named.as("notification-reducer"),
Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()))
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));