Search code examples
apache-kafkamicroservicesspring-kafkaillegalstateexception

How to solve 'java.lang.IllegalStateException: No thread-bound request found: [...]'?


BRIEF EXPLANATION:

I am developing a micro-service (MS-1) that should at some point reach information from another micro-service (MS-2) through a REST operation.

The MS-1 have a public end-point and also listen to kafka messages from a third micro-service (MS-3). Further, the endpoint will be deleted and MS-1 will listen to Kafka messages from MS-2 only.

PROBLEM:

When MS-1 is called through the endpoint, everything works fine and it does the needed REST operation correctly - MS-1 CAN CALL AND OBTAIN THE CORRECT RETURN FROM MS-2.

But, when the same request reach the MS-1 through Kafka messages, I mean, when MS-1 listen to a message sent from MS-3, it raises the following exception when MS-1 needs to call MS-2 through REST request - MS-1 CANNOT CALL MS-2.

EXCEPTION:

java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-6.0.10.jar:6.0.10]     at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-6.0.10.jar:6.0.10]     
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:370) ~[spring-web-6.0.10.jar:6.0.10]   
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:365) ~[spring-web-6.0.10.jar:6.0.10]   
at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:283) ~[spring-beans-6.0.10.jar:6.0.10]    
at jdk.proxy2/jdk.proxy2.$Proxy190.getAttribute(Unknown Source) ~[na:na]    
at [PROTECTED PATH].getHeaders(RestProxyService.java:65) ~[classes/:na]     
at [PROTECTED PATH].restExchange(RestProxyService.java:431) ~[classes/:na]  
at [PROTECTED PATH].callRemoteApi(RestProxyService.java:346) ~[classes/:na]     
at [PROTECTED PATH].callRestApi(RestProxyService.java:426) ~[classes/:na]   
at [PROTECTED PATH].getDetail(MyProxy.java:57) ~[classes/:na]   
at [PROTECTED PATH].entriesToReport(EntityWithDataService.java:75) ~[classes/:na]   at [PROTECTED PATH].getReportObjects(SupportedReportCodes.java:69) ~[classes/:na]   
at [PROTECTED PATH].generate(ReportService.java:72) ~[classes/:na]  
at [PROTECTED PATH].receive(KafkaConsumer.java:25) ~[classes/:na]   
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]  
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]    
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]    
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]  
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.10.jar:6.0.10]  
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.10.jar:6.0.10]    
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2924) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.8.jar:3.0.8]  
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]     
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.1.jar:1.11.1]   
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.8.jar:3.0.8]  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.8.jar:3.0.8]    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.8.jar:3.0.8]   
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.8.jar:3.0.8]     
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.8.jar:3.0.8]   
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]  
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

I have changed some configurations to solve it. For now, my application.properties and class are like that:

CONSUMER (listener) -> MS-1

application.properties

kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED]     
#spring.kafka.producer.topic=report     
spring.kafka.consumer.group-id=reports     
spring.kafka.consumer.topic=report     
spring.kafka.consumer.auto-offset-reset=earliest     
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer     
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer     
spring.kafka.consumer.properties.spring.json.trusted.packages=*     
spring.kafka.consumer.properties.spring.json.type.mapping=log:[PROTECTED PATH].ReportMessageObject

class KafkaConsumer

@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private MyService myService;
    
    @KafkaListener(topics = "${spring.kafka.consumer.topic}", errorHandler = "kafkaEventErrorHandler", properties = "{spring.json.value.default.type=[PROTECTED PATH].ReportMessageObject}")
    public void receive(@Payload @Valid ConsumerRecord<String, ReportMessageObject> payload) {
        try {
            log.debug("received payload={}", payload.toString());
            myService.generate(payload.value().getCompanyId(), payload.value().getBody());
        } catch (Exception e) {
            log.error("Exception occurred while consuming ExternalInterface message {}", e.getMessage(), e);
        }
    }

}

class KafkaConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String kafkaAddress;
    
    @Value(value = "${spring.kafka.consumer.group-id: null}")
    private String kafkaGroup;
    
    @Value("${kafka.configuration.security.enabled:false}")
    private boolean kafkaSecurityEnabled;
    
    @Value("${spring.kafka.properties.security.protocol:null}")
    private String kafkaSecurityProtocol;
    
    @Value("${spring.kafka.properties.sasl.mechanism:null}")
    private String kafkaSecurityMechanism;
    
    @Value("${spring.kafka.properties.sasl.jaas.config:null}")
    private String kafkaSecurityJaasConfig;
    
    @Bean
    public ConsumerFactory<String, ReportMessageObject> responseConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getKafkaConsumerConfigProperties(kafkaGroup), new StringDeserializer(), new JsonDeserializer<>(ReportMessageObject.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> responseKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(responseConsumerFactory());
        return factory;
    }
    
    private Map<String, Object> getKafkaConsumerConfigProperties(String groupId) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
        if(groupId != null && !groupId.isEmpty()) {
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        }
        
        if (kafkaSecurityEnabled) {
            configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
            configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
        }
        
        return configProps;
    }

}

PRODUCER -> MS-3

application.properties

kafka.security.enabled=false     
spring.kafka.bootstrap-servers=[PROTECTED PATH]     
spring.kafka.producer.topic=report

class KafkaProducer

@Slf4j
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, ReportMessageObject> kafkaTemplate;
    @Autowired
    private SmartKafkaHeader smartKafkaHeader;
    
    private String property1;
    private String property2;
    private String property3;
    private String property4;
    
    public void addHeaders(String property1, String property2, String property3, String property4) {
        this.property1= property1;
        this.property2= property2;
        this.property3= property3;
        this.property4= property4;
    }
    
    public void send(String topic, ReportMessageObject reportMessageObject) {
    
        ProducerRecord<String, ReportMessageObject> message = new ProducerRecord<>(topic, reportMessageObject);
    
        smartKafkaHeader.addHeadersToRecord(message, property1, property2, property3, property4);
    
        kafkaTemplate.send(message);
        kafkaTemplate.flush();
    
        log.info("sending payload='{}' to topic='{}'", reportMessageObject, topic);
    }
    
    }

class KafkaProducerConfig

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String kafkaAddress;
    
    @Value(value = "${spring.kafka.producer.group-id: null}")
    private String kafkaGroup;
    
    @Value("${kafka.configuration.security.enabled:false}")
    private boolean kafkaSecurityEnabled;
    
    @Value("${spring.kafka.properties.security.protocol:null}")
    private String kafkaSecurityProtocol;
    
    @Value("${spring.kafka.properties.sasl.mechanism:null}")
    private String kafkaSecurityMechanism;
    
    @Value("${spring.kafka.properties.sasl.jaas.config:null}")
    private String kafkaSecurityJaasConfig;
    
    @Bean
    public ProducerFactory<String, ReportMessageObject> producerFactorySmartMessaging() {
        return new DefaultKafkaProducerFactory<>(getKafkaProducerConfigProperties(kafkaGroup));
    }
    
    @Bean
    public KafkaTemplate<String, ReportMessageObject> kafkaMessaging() {
        return new KafkaTemplate<>(producerFactorySmartMessaging());
    }
    
    private Map<String, Object> getKafkaProducerConfigProperties(String groupId){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
        if(groupId != null && !groupId.isEmpty()) {
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        }
    
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(JsonSerializer.TYPE_MAPPINGS, "log:[PROTECTED PATH].ReportMessageObject");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        
        if (kafkaSecurityEnabled) {
            configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
            configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
        }
    
        return configProps;
    }

}

Solution

  • The error is quite clear:

    java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

    There is no web request context when the message is coming from Kafka, that is only a concept for a web application.

    You can't use the same code in both places.

    You would need to propagate any such information via record headers to make it available to a downstream consumer (and access it from there rather than trying to access the web context).