I am a beginner who just started developing pulsar-client with spring boot. First of all, I learned the basics through pulsar doc and git, but I was stuck testing batch transmission of messages from the pulsar-client producer. In particular, I want to send JsonArray data in batches, but I keep getting a JsonArray.getAsInt error. Please take a look at my code and tell me what's wrong
package com.refactorizando.example.pulsar.producer;
import static java.util.stream.Collectors.toList;
import com.refactorizando.example.pulsar.config.PulsarConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.com.google.gson.JsonArray;
import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class PulsarProducer {
private static final String TOPIC_NAME = "Json_Test";
private final PulsarClient client;
@Bean(name = "producer")
public void producer() throws PulsarClientException {
// batching
Producer<JsonArray> producer = client.newProducer(JSONSchema.of(JsonArray.class))
.topic(TOPIC_NAME)
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.enableBatching(true)
.compressionType(CompressionType.LZ4)
.create();
String data = "{'users': [{'userId': 1,'firstName': 'AAAAA'},{'userId': 2,'firstName': 'BBBB'},{'userId': 3,'firstName': 'CCCCC'},{'userId': 4,'firstName': 'DDDDD'},{'userId': 5,'firstName': 'EEEEE'}]}";
JsonElement element = JsonParser.parseString(data);
JsonObject obj = element.getAsJsonObject();
JsonArray arr = obj.getAsJsonArray("users");
try {
producer.send(arr);
} catch (Exception e) {
log.error("Error sending mesasage");
e.printStackTrace();
}
producer.close();
}
}
I'm still a beginner developer, so I couldn't find it on stackOverflow because I couldn't search well. If you have anything related to it, please leave a link and I'll delete the question. Thanks for reading my question and have a nice day!
I tried several things, such as converting to JsonObject and sending, converting to String and sending, etc., but the same error came out.
cho , Welcome to Pulsar and Spring Pulsar! I believe there are a few things to cover to fully answer your question.
In your example you are crafting a Producer directly from the PulsarClient. There is absolutely nothing wrong/bad about using that API directly. However, if you want to use Spring Pulsar, the recommended approach to send messages in a Spring Boot app using Spring Pulsar is via the auto-configured PulsarTemplate
(or ReactivePulsarTemplate
if using Reactive). It simplifies usage and allows configuring the template/producer using configuration properties. For example, instead of building up and then using Producer.send()
you would instead inject the pulsar template and use it as follows:
pulsarTemplate.newMessage(foo)
.withTopic("Json_Test")
.withSchema(Schema.JSON(Foo.class))
.withProducerCustomizer((producerBuilder) -> {
producerBuilder
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.enableBatching(true)
.compressionType(CompressionType.LZ4);
})
.send();
Furthermore you can replace the builder calls w/ configuration properties like:
spring:
pulsar:
producer:
batching-enabled: true
batching-max-publish-delay: 60s
batching-max-messages: 2
compression-type: lz4
and then your code becomes:
pulsarTemplate.newMessage(foo)
.withTopic("Json_Test")
.withSchema(Schema.JSON(Foo.class))
.send();
NOTE: I replace json array w/ Foo for simplicity.
In Pulsar, the Schema knows how to de/serialize the data. The built-in Pulsar Schema.JSON
by default uses the Jackson json lib to de/serialize the data. This requires that the data must be able to be handled by Jackson ObjectMapper.readValue/writeValue
methods. It handles POJOs really well, but does not handle the JSON impl you are using.
I noticed the latest json-lib
is 2.4
and (AFAICT) has 9 CVEs against it and was last released in 2010. If I had to use a Json level API for my data I would pick a more contemporary and well supported / used lib such as Jackson or Gson.
I switched your sample to use Jackson ArrayNode and it worked well. I did have to replace the single quotes in your data string to backslash double-quote as Jackson by default does not like single-quoted data. Here is the re-worked sample app using Jackson ArrayNode:
@SpringBootApplication
public class HyunginChoSpringPulsarUserApp {
public static void main(String[] args) {
SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
}
@Bean
ApplicationRunner sendDataOnStartup(PulsarTemplate<ArrayNode> pulsarTemplate) {
return (args) -> {
String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
ArrayNode jsonArray = (ArrayNode) ObjectMapperFactory.create().readTree(data2).get("users");
System.out.printf("*** SENDING: %s%n", jsonArray);
pulsarTemplate.newMessage(jsonArray)
.withTopic("Json_Test")
.withSchema(Schema.JSON(ArrayNode.class))
.send();
};
}
@PulsarListener(topics = "Json_Test", schemaType = SchemaType.JSON, batch = true)
public void listenForData(List<ArrayNode> user) {
System.out.printf("***** LISTEN: %s%n".formatted(user));
}
}
The output looks like:
*** SENDING: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]
***** LISTEN: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]
Your data is an array of users. Do you have a requirement to use a Json level API or you instead deal with a List<User>
POJOs? This would simplify things and make it much better experience to use. The Java record is a great choice such as:
public record(String userId, String firstName) {}
then you can pass in a List<User>
to your PulsarTemplate and everything will work well. For example:
@SpringBootApplication
public class HyunginChoSpringPulsarUserApp {
public static void main(String[] args) {
SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
}
@Bean
ApplicationRunner sendDataOnStartup(PulsarTemplate<User> pulsarTemplate) {
return (args) -> {
String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
ObjectMapper objectMapper = ObjectMapperFactory.create();
JsonNode usersNode = objectMapper.readTree(data2).get("users");
List<User> users = objectMapper.convertValue(usersNode, new TypeReference<>() {});
System.out.printf("*** SENDING: %s%n", users);
for (User user : users) {
pulsarTemplate.newMessage(user)
.withTopic("Json_Test2")
.withSchema(Schema.JSON(User.class))
.send();
}
};
}
@PulsarListener(topics = "Json_Test2", schemaType = SchemaType.JSON, batch = true)
public void listenForData(List<User> users) {
users.forEach((user) -> System.out.printf("***** LISTEN: %s%n".formatted(user)));
}
public record User(String userId, String firstName) {}
}
*** SENDING: [User[userId=1, firstName=AAAAA], User[userId=2, firstName=BBBB], User[userId=3, firstName=CCCCC], User[userId=4, firstName=DDDDD], User[userId=5, firstName=EEEEE]]
...
***** LISTEN: User[userId=1, firstName=AAAAA]
***** LISTEN: User[userId=2, firstName=BBBB]
***** LISTEN: User[userId=3, firstName=CCCCC]
***** LISTEN: User[userId=4, firstName=DDDDD]
***** LISTEN: User[userId=5, firstName=EEEEE]
I hope this helps. Take care.