I am trying to implement a simple messaging queue using Spring Boot and Apache Pulsar. The system works fine when working with String datatype but not for a Custom Object. Below are the object I'm using as well as the producer and consumer:
Custom Object:
package com.example.springpulsar;
public class User {
private String email;
private String firstName;
// constructors, getters and setters
}
Producer:
package com.example.springpulsar;
// imports
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
}
Consumer:
package com.example.springpulsar;
// imports
@Service
public class PulsarConsumer {
private static final String USER_TOPIC = "user-topic";
private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
}
Gradle dependency: org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0
Application properties:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.example.springpulsar.User
schema-info:
schema-type: JSON
I read somewhere that adding the schemaType in the Listener annotation should work, but it hasn't. Debug logs:
2023-06-20T14:41:24.050+05:30 DEBUG 32777 --- [nio-8085-exec-5] o.a.p.c.impl.BatchMessageContainerImpl : [user-topic] [null] add message to batch, num messages in batch so far 0
2023-06-20T14:41:24.053+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [user-topic] [standalone-1-21] Sending message cnx org.apache.pulsar.client.impl.ClientCnx@6f18e7a7, sequenceId 1
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder : [localhost/127.0.0.1:6650] Received cmd SEND_RECEIPT
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Got receipt for producer: 0 -- msg: 1 -- id: 22:42
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [user-topic] [standalone-1-21] Received ack for msg 1
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder : [localhost/127.0.0.1:6650] Received cmd MESSAGE
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@697f429b
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl : [user-topic][user-topic-subscription] Received message: 22/42
I do see that the ConsumerImpl
receives the message, but the listener method is never called.
Any help is appreciated. Thank you!
The problem is because of your domain object missing a default constructor and thus Jackson throws an internal exception. Please refer to Jackson docs for more details on this topic. You can add a default constructor to your User
domain class. Alternatively, you can also switch your domain class to a Java record as below:
public record User(String email, String firstName){}
What was happening was that internally, the exception was caught by the framework and interpret that as a signal for negatively acknowledging the message (nack). Each nack'ed message will be redelivered to the Pulsar consumer every 60 seconds by default. This is why you were not seeing any exception stack trace on the console. We can add a debug log to make these errors come to surface sooner.
You can also add dead letter policy for the consumer so that the failed message doesn't get redelivered every minute infinitely. See this docs section for more details on that: https://docs.spring.io/spring-pulsar/docs/current-SNAPSHOT/reference/html/#_using_dead_letter_topic_from_apache_pulsar_for_message_redelivery_and_error_handling However, in your case, all the messages would fail without the suggested fixes here, since the underlying domain object class has deserialization issues.