Search code examples
apache-kafkaapache-kafka-streamsspring-kafka

Kafka Streams with custom JSON serializer


This question is the follow-up of my previous one where I asked about serialization of Kafka stream using custom Avro Serdes. Now I have a different issue when trying to configure a JSON Serde. I have this kafka stream topology where I use a groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification())).

@Service
@Slf4j
@EnableBinding(PosListenerAvroJsonBinding.class)
public class NotificationAvroJsonProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-avro-channel")
    @SendTo("notification-output-json-channel")
    public KStream<String, Notification> process(KStream<String, PosInvoiceAvro> input) {

        /* with reduce transformation and serialization with KTable */
        KStream<String, Notification> notificationJsonKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
                // ***********************************************
                // THIS DOES NOT WORK WITH JSON, only works with AVRO.
                .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
                // ***********************************************
                .reduce((aggValue, newValue) -> {
                    newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                    return newValue;
                })
                .toStream();
        notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
        return notificationJsonKStream;
    }
}

I defined the Custom serializer based on this web page. I think I have to use the com.fasterxml.jackson.databind.JsonNode, but it did not work. I also tested the other options that are commented and they also did not work.

public class CustomSerdes extends Serdes {
    private final static Map<String, String> serdeConfig = Stream.of(
            new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
            , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode")
            // , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.github.felipegutierrez.explore.spring.model.Notification")
            // , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JavaType")
            , new AbstractMap.SimpleEntry<>(TYPE_PROPERTY, TYPE_PROPERTY_DEFAULT)
            // , new AbstractMap.SimpleEntry<>("json.value.type", "org.springframework.kafka.support.serializer.JsonSerializer")
    )

    public static Serde<Notification> Notification() {
        final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<Notification>();
        notificationSerde.configure(serdeConfig, false);
        return notificationSerde;
    }

On the web page also say to define the type.property=javaType, the JSON schema could specify "javaType":"org.acme.MyRecord" at the top level of the java class.

@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSchemaInject(strings = {@JsonSchemaString(path = "javaType", value = "com.github.felipegutierrez.explore.spring.model.Notification")})
public class Notification {
    @JsonProperty("InvoiceNumber")
    private String InvoiceNumber;
    @JsonProperty("CustomerCardNo")
    private String CustomerCardNo;
    @JsonProperty("TotalAmount")
    private Double TotalAmount;
    @JsonProperty("EarnedLoyaltyPoints")
    private Double EarnedLoyaltyPoints;
    @JsonProperty("TotalLoyaltyPoints")
    private Double TotalLoyaltyPoints = 0.0;
}

When I was using the default JSON serializer for Spring + Kafka I just set spring.json.add.type.headers: false on the application.yaml and it worked. But I cannot find such property on the Confluent serializer.

Finally, the error is below. I think the way to go is to use a correct parameter at my notificationSerde.configure(serdeConfig, false); because when I change the json serializer there I see that the application tries to cast to different classes. But I don't know which configuration I have to place there.

Caused by: java.lang.ClassCastException: class com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class com.github.felipegutierrez.explore.spring.model.Notification (com.fasterxml.jackson.databind.node.ObjectNode and com.github.felipegutierrez.explore.spring.model.Notification are in unnamed module of loader 'app')


Solution

  • I fixed using this configuration of the Serde.

    @Service
    public class CustomSerdes extends Serdes {
    private final static Map<String, String> serdeConfig = Stream.of(
                new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
                , new AbstractMap.SimpleEntry<>(FAIL_INVALID_SCHEMA, "true")
                , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, Notification.class.getName()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
        public static Serde<Notification> Notification() {
            final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<>();
            notificationSerde.configure(serdeConfig, false);
            return notificationSerde;
        }
    }
    

    and added the Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()) to the reducer as well as it is suggested here.

            KStream<String, Notification> notificationJsonKStream = input
                    .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                    .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
                    .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
                    .reduce((aggValue, newValue) -> {
                                newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                                return newValue;
                            },
                            Named.as("notification-reducer"),
                            Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()))
                    .toStream();
            notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));