We use spring cloud stream to produce the event to kafka and it was all working fine until we started to introduce redis cache.
I think for some reason the combination of Redis and Kafka on the serialization fails.
At this stage, we seem to have the redis cache working fine but emitting an event to kafka broken and getting the below Redis exception while producing the event onto Kafka.
Error while producing SampleEvent: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'sampleOutput';
nested exception is org.springframework.data.redis.serializer.SerializationException: Could not write JSON: Not an array:
{"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException:
Not an array: {"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]), failedMessage=GenericMessage [payload={"field": "value"}, headers={id=16108acf-6ca1-f1b2-2ed3-44eb857daa6b, contentType=application/*+avro, timestamp=1712741537549}]
Spring boot: 2.4.5
Spring cloud stream: 2020.0.2
And this github repo contains the reproducible example.
https://github.com/krishgokul494/redisandkafka.git
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample</groupId>
<artifactId>redisandkafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>redisandkafka</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>2020.0.2</spring-cloud.version>
<spring-boot.version>2.4.5</spring-boot.version>
<avro.version>1.8.2</avro.version>
<confluent.version>4.0.0</confluent.version>
<jacoco.version>0.8.5</jacoco.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.sample.redisandkafka.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
<configuration>
<excludes>
<exclude>**/avro/**/*</exclude>
</excludes>
</configuration>
<executions>
<execution>
<id>default-prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>default-report</id>
<phase>prepare-package</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>kafka-binder</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>repo</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<repositories>
<repository>
<id>spring-libs-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring libs-Milestones</name>
<url>https://repo.spring.io/libs-milestone/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</profile>
</profiles>
<pluginRepositories>
<pluginRepository>
<id>repository.spring.release</id>
<name>Spring GA Repository</name>
<url>https://repo.spring.io/plugins-release/</url>
</pluginRepository>
</pluginRepositories>
</project>
SampleKafkaSourceProducer.java
package com.sample.redisandkafka.kafka;
import com.sample.redisandkafka.SampleEvent;
import com.sample.redisandkafka.config.SampleOutputSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(SampleOutputSource.class)
@Slf4j
public class SampleKafkaSourceProducer {
@Autowired
private SampleOutputSource sampleOutputSource;
public boolean produceSampleEvent() {
SampleEvent sampleEvent = SampleEvent.newBuilder().setField("value").build();
try {
sampleOutputSource.sampleMessageChannel().send(MessageBuilder.withPayload(sampleEvent).build());
} catch(Exception ex) {
log.error("Error while producing SampleEvent: {}", ex.toString());
return false;
}
return true;
}
}
KafkaConfig.java
package com.sample.redisandkafka.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.schema.registry.client.ConfluentSchemaRegistryClient;
import org.springframework.cloud.schema.registry.client.SchemaRegistryClient;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.KafkaHeaderMapper;
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
RedisCacheConfig.java
package com.sample.redisandkafka.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@EnableCaching
@Configuration
@ConditionalOnProperty(name = "spring.cache.enabled", havingValue = "true")
public class RedisCacheConfig extends CachingConfigurerSupport {
@Autowired
private CacheConfigurationProperties cacheConfigurationProperties = null;
private RedisCacheConfiguration createCacheConfiguration(long timeoutInHours) {
return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(timeoutInHours))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
}
@Bean
public CacheManager cacheManager(LettuceConnectionFactory redisConnectionFactory) {
Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();
if (Objects.nonNull(cacheConfigurationProperties.getCachesTTL())) {
for (Entry<String, Long> cacheNameAndTimeout : cacheConfigurationProperties.getCachesTTL()
.entrySet()) {
cacheConfigurations.put(cacheNameAndTimeout.getKey(),
createCacheConfiguration(cacheNameAndTimeout.getValue()));
}
}
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(createCacheConfiguration(cacheConfigurationProperties.getDefaultTTL()))
.withInitialCacheConfigurations(cacheConfigurations).build();
}
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(
cacheConfigurationProperties.getHost(), cacheConfigurationProperties.getPort());
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Configuration
@ConfigurationProperties(prefix = "spring.cache")
@Data
class CacheConfigurationProperties {
private String host;
private int port;
private Long defaultTTL;
private Map<String, Long> cachesTTL;
}
}
SampleInputSink.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface SampleInputSink {
String SAMPLE = "sampleInput";
@Input(SAMPLE)
MessageChannel sampleMessageChannel();
}
SampleOutputSource.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SampleOutputSource {
String SAMPLE = "sampleOutput";
@Output(SAMPLE)
MessageChannel sampleMessageChannel();
}
application.yml
spring:
application:
name: redisandkafka
main:
allow-bean-definition-overriding: true
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:9081
kafka:
binder:
brokers: PLAINTEXT_HOST://localhost:30092
min-partition-count: 1
replication-factor: 1
useNativeDecoding: true
bindings:
sampleInput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
configuration:
schema.registry.url: http://localhost:9081
specific.avro.reader: true
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max:
poll:
records: 100
interval.ms: 900000
bindings:
sampleInput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event
group: sample.local.sample_event
sampleOutput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event_output
group: sample.local.sample_event
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.StringSerializer
value:
serde: io.confluent.kafka.serializers.KafkaAvroSerializer
security.basic.enable: false
management.security.enabled: false
security.ignored: /**
Sample.asvc
{
"namespace": "com.sample.redisandkafka",
"type": "record",
"name": "SampleEvent",
"fields": [
{"name": "field", "type": "string", "default": "null"}
]
}
Update 1:
Using the same sample app, we removed redis related dependencies [provided below] and removed the class RedisCacheConfig class
and made the application kafka alone and it works well
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
application.yml
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
Add config "useNativeEncoding: true" to your producer that way it will bypass the default serializer that is autoconfigured by Spring Boot and will use the Serializer of your binding.
Note: All the config updates are done in application.yml
Native Encoding:
sampleOutput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event_output
group: sample.local.sample_event
producer:
useNativeEncoding: true
Also you will have to add your required key.serializer and value.serializer, in your case KafkaAvroSerializer for Value serializer in producer properties.
Add Serailizer:
kafka:
binder:
producer-properties:
schema.registry.url: http://localhost:9081
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
The serailizer I added above is global configuration. Add "useNativeEncoding: true" to which ever producers needs to bypass the default serializer configured by Spring Boot and use the serializer of your binding.
Similarly if required you can do the same for deserializer.
Native Decoding:
bindings:
sampleInput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event
group: sample.local.sample_event
consumer:
useNativeDecoding: true
Add Deserializer:
kafka:
binder:
consumer-properties:
schema.registry.url: http://localhost:9081
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
PR with My Changes: https://github.com/krishgokul494/redisandkafka/pull/1
Please let me know for any further queries.