Search code examples
javaspring-bootserializationapache-kafkaredis

Spring boot Redis and Kafka Serialization Error


We use spring cloud stream to produce the event to kafka and it was all working fine until we started to introduce redis cache.

I think for some reason the combination of Redis and Kafka on the serialization fails.

At this stage, we seem to have the redis cache working fine but emitting an event to kafka broken and getting the below Redis exception while producing the event onto Kafka.

Error while producing SampleEvent: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'sampleOutput'; 
nested exception is org.springframework.data.redis.serializer.SerializationException: Could not write JSON: Not an array: 
{"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: 
Not an array: {"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]), failedMessage=GenericMessage [payload={"field": "value"}, headers={id=16108acf-6ca1-f1b2-2ed3-44eb857daa6b, contentType=application/*+avro, timestamp=1712741537549}]

Spring boot: 2.4.5

Spring cloud stream: 2020.0.2

And this github repo contains the reproducible example.

https://github.com/krishgokul494/redisandkafka.git

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sample</groupId>
    <artifactId>redisandkafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>redisandkafka</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>2020.0.2</spring-cloud.version>
        <spring-boot.version>2.4.5</spring-boot.version>
        <avro.version>1.8.2</avro.version>
        <confluent.version>4.0.0</confluent.version>
        <jacoco.version>0.8.5</jacoco.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-schema-registry-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-cache</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.hibernate</groupId>
                    <artifactId>hibernate-validator</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.sample.redisandkafka.Application</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>${jacoco.version}</version>
                <configuration>
                    <excludes>
                        <exclude>**/avro/**/*</exclude>
                    </excludes>
                </configuration>
                <executions>
                    <execution>
                        <id>default-prepare-agent</id>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>default-report</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>kafka-binder</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                </dependency>
            </dependencies>
        </profile>

        <profile>
            <id>repo</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <repositories>
                <repository>
                    <id>spring-libs-milestones</id>
                    <name>Spring Milestones</name>
                    <url>https://repo.spring.io/milestone</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <id>spring-milestones</id>
                    <name>Spring libs-Milestones</name>
                    <url>https://repo.spring.io/libs-milestone/</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <id>confluent</id>
                    <url>http://packages.confluent.io/maven/</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
            </repositories>
        </profile>
    </profiles>
    <pluginRepositories>
        <pluginRepository>
            <id>repository.spring.release</id>
            <name>Spring GA Repository</name>
            <url>https://repo.spring.io/plugins-release/</url>
        </pluginRepository>
    </pluginRepositories>

</project>

SampleKafkaSourceProducer.java

package com.sample.redisandkafka.kafka;

import com.sample.redisandkafka.SampleEvent;
import com.sample.redisandkafka.config.SampleOutputSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(SampleOutputSource.class)
@Slf4j
public class SampleKafkaSourceProducer {

  @Autowired
  private SampleOutputSource sampleOutputSource;

  public boolean produceSampleEvent() {
    SampleEvent sampleEvent = SampleEvent.newBuilder().setField("value").build();

    try {
      sampleOutputSource.sampleMessageChannel().send(MessageBuilder.withPayload(sampleEvent).build());
    } catch(Exception ex) {
      log.error("Error while producing SampleEvent: {}", ex.toString());
      return false;
    }
    return true;
  }

}

KafkaConfig.java

package com.sample.redisandkafka.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.schema.registry.client.ConfluentSchemaRegistryClient;
import org.springframework.cloud.schema.registry.client.SchemaRegistryClient;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.KafkaHeaderMapper;

@Configuration
public class KafkaConfig {

  @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
  private String endpoint;

  @Bean
  public SchemaRegistryClient confluentSchemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endpoint);
    return client;
  }


  @Bean("kafkaBinderHeaderMapper")
  public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    BinderHeaderMapper mapper = new BinderHeaderMapper();
    mapper.setEncodeStrings(true);
    return mapper;
  }
}

RedisCacheConfig.java

package com.sample.redisandkafka.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;


@EnableCaching
@Configuration
@ConditionalOnProperty(name = "spring.cache.enabled", havingValue = "true")
public class RedisCacheConfig extends CachingConfigurerSupport {

  @Autowired
  private CacheConfigurationProperties cacheConfigurationProperties = null;

  private RedisCacheConfiguration createCacheConfiguration(long timeoutInHours) {
    return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(timeoutInHours))
     .serializeValuesWith(RedisSerializationContext.SerializationPair
     .fromSerializer(new GenericJackson2JsonRedisSerializer()));
  }

  @Bean
  public CacheManager cacheManager(LettuceConnectionFactory redisConnectionFactory) {
    Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();

    if (Objects.nonNull(cacheConfigurationProperties.getCachesTTL())) {
      for (Entry<String, Long> cacheNameAndTimeout : cacheConfigurationProperties.getCachesTTL()
          .entrySet()) {
        cacheConfigurations.put(cacheNameAndTimeout.getKey(),
            createCacheConfiguration(cacheNameAndTimeout.getValue()));
      }
    }

    return RedisCacheManager.builder(redisConnectionFactory)
        .cacheDefaults(createCacheConfiguration(cacheConfigurationProperties.getDefaultTTL()))
        .withInitialCacheConfigurations(cacheConfigurations).build();
  }

  @Bean
  public LettuceConnectionFactory redisConnectionFactory() {
    RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(
        cacheConfigurationProperties.getHost(), cacheConfigurationProperties.getPort());

    return new LettuceConnectionFactory(redisStandaloneConfiguration);
  }

  @Configuration
  @ConfigurationProperties(prefix = "spring.cache")
  @Data
  class CacheConfigurationProperties {
    private String host;
    private int port;
    private Long defaultTTL;
    private Map<String, Long> cachesTTL;
  }
}

SampleInputSink.java

package com.sample.redisandkafka.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface SampleInputSink {

  String SAMPLE = "sampleInput";

  @Input(SAMPLE)
  MessageChannel sampleMessageChannel();
}

SampleOutputSource.java

package com.sample.redisandkafka.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface SampleOutputSource {
  String SAMPLE = "sampleOutput";

  @Output(SAMPLE)
  MessageChannel sampleMessageChannel();
}

application.yml

spring:
  application:
    name: redisandkafka
  main:
    allow-bean-definition-overriding: true

  cache:
    cache-names: sample-cache
    default-ttl: 1 # TTL in hours
    enabled: true
    type: redis
    host: localhost
    port: 6379
    caches-ttl: # TTL in hours
      sample-cache: 1

  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:9081
      kafka:
        binder:
          brokers: PLAINTEXT_HOST://localhost:30092
          min-partition-count: 1
          replication-factor: 1
          useNativeDecoding: true
        bindings:
          sampleInput:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            consumer:
              configuration:
                schema.registry.url: http://localhost:9081
                specific.avro.reader: true
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                max:
                  poll:
                    records: 100
                    interval.ms: 900000
      bindings:
        sampleInput:
          contentType: application/*+avro
          destination: redisandkafka.local.sample_event
          group: sample.local.sample_event
        sampleOutput:
          contentType: application/*+avro
          destination: redisandkafka.local.sample_event_output
          group: sample.local.sample_event
      streams:
        binder:
          configuration:
            default:
              key:
                serde: org.apache.kafka.common.serialization.StringSerializer
              value:
                serde: io.confluent.kafka.serializers.KafkaAvroSerializer

security.basic.enable: false
management.security.enabled: false
security.ignored: /**

Sample.asvc

{
  "namespace": "com.sample.redisandkafka",
  "type": "record",
  "name": "SampleEvent",
  "fields": [
    {"name": "field", "type": "string", "default": "null"}
  ]
}

Update 1: Using the same sample app, we removed redis related dependencies [provided below] and removed the class RedisCacheConfig class and made the application kafka alone and it works well

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yml

  cache:
    cache-names: sample-cache
    default-ttl: 1 # TTL in hours
    enabled: true
    type: redis
    host: localhost
    port: 6379
    caches-ttl: # TTL in hours
      sample-cache: 1

Solution

  • Add config "useNativeEncoding: true" to your producer that way it will bypass the default serializer that is autoconfigured by Spring Boot and will use the Serializer of your binding.

    Note: All the config updates are done in application.yml

    Native Encoding:

         sampleOutput:
           contentType: application/*+avro
           destination: redisandkafka.local.sample_event_output
           group: sample.local.sample_event
           producer:
             useNativeEncoding: true
    

    Also you will have to add your required key.serializer and value.serializer, in your case KafkaAvroSerializer for Value serializer in producer properties.

    Add Serailizer:

          kafka:
            binder:
              producer-properties:
                schema.registry.url: http://localhost:9081
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    

    The serailizer I added above is global configuration. Add "useNativeEncoding: true" to which ever producers needs to bypass the default serializer configured by Spring Boot and use the serializer of your binding.

    Similarly if required you can do the same for deserializer.

    Native Decoding:

          bindings:
            sampleInput:
              contentType: application/*+avro
              destination: redisandkafka.local.sample_event
              group: sample.local.sample_event
              consumer:
                useNativeDecoding: true
    

    Add Deserializer:

          kafka:
            binder:
              consumer-properties:
                schema.registry.url: http://localhost:9081
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    

    PR with My Changes: https://github.com/krishgokul494/redisandkafka/pull/1

    Please let me know for any further queries.