Search code examples
javaspring-bootapache-pulsarpulsarspring-pulsar

How to send JsonArray data in apache-pulsar-client?


I am a beginner who just started developing pulsar-client with spring boot. First of all, I learned the basics through pulsar doc and git, but I was stuck testing batch transmission of messages from the pulsar-client producer. In particular, I want to send JsonArray data in batches, but I keep getting a JsonArray.getAsInt error. Please take a look at my code and tell me what's wrong

package com.refactorizando.example.pulsar.producer;

import static java.util.stream.Collectors.toList;

import com.refactorizando.example.pulsar.config.PulsarConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.com.google.gson.JsonArray;
import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class PulsarProducer {

  private static final String TOPIC_NAME = "Json_Test";

  private final PulsarClient client;


  @Bean(name = "producer")
  public void producer() throws PulsarClientException {
   
    // batching
    Producer<JsonArray> producer = client.newProducer(JSONSchema.of(JsonArray.class))
        .topic(TOPIC_NAME)
        .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
        .batchingMaxMessages(2)
        .enableBatching(true)
        .compressionType(CompressionType.LZ4)
        .create();
    String data = "{'users': [{'userId': 1,'firstName': 'AAAAA'},{'userId': 2,'firstName': 'BBBB'},{'userId': 3,'firstName': 'CCCCC'},{'userId': 4,'firstName': 'DDDDD'},{'userId': 5,'firstName': 'EEEEE'}]}";
    JsonElement element = JsonParser.parseString(data);
    JsonObject obj = element.getAsJsonObject();
    JsonArray arr = obj.getAsJsonArray("users");
    try {
      producer.send(arr);
    } catch (Exception e) {
      log.error("Error sending mesasage");
      e.printStackTrace();
    }

    producer.close();

  }

}

I'm still a beginner developer, so I couldn't find it on stackOverflow because I couldn't search well. If you have anything related to it, please leave a link and I'll delete the question. Thanks for reading my question and have a nice day!

I tried several things, such as converting to JsonObject and sending, converting to String and sending, etc., but the same error came out.


Solution

  • cho , Welcome to Pulsar and Spring Pulsar! I believe there are a few things to cover to fully answer your question.

    Spring Pulsar Usage

    In your example you are crafting a Producer directly from the PulsarClient. There is absolutely nothing wrong/bad about using that API directly. However, if you want to use Spring Pulsar, the recommended approach to send messages in a Spring Boot app using Spring Pulsar is via the auto-configured PulsarTemplate (or ReactivePulsarTemplate if using Reactive). It simplifies usage and allows configuring the template/producer using configuration properties. For example, instead of building up and then using Producer.send() you would instead inject the pulsar template and use it as follows:

    pulsarTemplate.newMessage(foo)
        .withTopic("Json_Test")
        .withSchema(Schema.JSON(Foo.class))
        .withProducerCustomizer((producerBuilder) -> {
          producerBuilder
              .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
              .batchingMaxMessages(2)
              .enableBatching(true)
              .compressionType(CompressionType.LZ4);
        })
        .send();
    

    Furthermore you can replace the builder calls w/ configuration properties like:

    spring:
      pulsar:
        producer:
          batching-enabled: true
          batching-max-publish-delay: 60s
          batching-max-messages: 2
          compression-type: lz4
    

    and then your code becomes:

    pulsarTemplate.newMessage(foo)
        .withTopic("Json_Test")
        .withSchema(Schema.JSON(Foo.class))
        .send();
    

    NOTE: I replace json array w/ Foo for simplicity.

    Schemas

    In Pulsar, the Schema knows how to de/serialize the data. The built-in Pulsar Schema.JSON by default uses the Jackson json lib to de/serialize the data. This requires that the data must be able to be handled by Jackson ObjectMapper.readValue/writeValue methods. It handles POJOs really well, but does not handle the JSON impl you are using.

    I noticed the latest json-lib is 2.4 and (AFAICT) has 9 CVEs against it and was last released in 2010. If I had to use a Json level API for my data I would pick a more contemporary and well supported / used lib such as Jackson or Gson.

    I switched your sample to use Jackson ArrayNode and it worked well. I did have to replace the single quotes in your data string to backslash double-quote as Jackson by default does not like single-quoted data. Here is the re-worked sample app using Jackson ArrayNode:

    @SpringBootApplication
    public class HyunginChoSpringPulsarUserApp {
    
        public static void main(String[] args) {
            SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
        }
    
        @Bean
        ApplicationRunner sendDataOnStartup(PulsarTemplate<ArrayNode> pulsarTemplate) {
            return (args) -> {
                String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
                ArrayNode jsonArray = (ArrayNode) ObjectMapperFactory.create().readTree(data2).get("users");
                System.out.printf("*** SENDING: %s%n", jsonArray);
                pulsarTemplate.newMessage(jsonArray)
                        .withTopic("Json_Test")
                        .withSchema(Schema.JSON(ArrayNode.class))
                        .send();
            };
        }
    
        @PulsarListener(topics = "Json_Test", schemaType = SchemaType.JSON, batch = true)
        public void listenForData(List<ArrayNode> user) {
            System.out.printf("***** LISTEN: %s%n".formatted(user));
        }
    }
    

    The output looks like:

    *** SENDING: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]
    
    ***** LISTEN: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]
    

    Data Model

    Your data is an array of users. Do you have a requirement to use a Json level API or you instead deal with a List<User> POJOs? This would simplify things and make it much better experience to use. The Java record is a great choice such as:

    public record(String userId, String firstName) {}
    

    then you can pass in a List<User> to your PulsarTemplate and everything will work well. For example:

    @SpringBootApplication
    public class HyunginChoSpringPulsarUserApp {
    
        public static void main(String[] args) {
            SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
        }
    
        @Bean
        ApplicationRunner sendDataOnStartup(PulsarTemplate<User> pulsarTemplate) {
            return (args) -> {
                String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
                ObjectMapper objectMapper = ObjectMapperFactory.create();
                JsonNode usersNode = objectMapper.readTree(data2).get("users");
                List<User> users = objectMapper.convertValue(usersNode, new TypeReference<>() {});
                System.out.printf("*** SENDING: %s%n", users);
                for (User user : users) {
                    pulsarTemplate.newMessage(user)
                            .withTopic("Json_Test2")
                            .withSchema(Schema.JSON(User.class))
                            .send();
                }
            };
        }
    
        @PulsarListener(topics = "Json_Test2", schemaType = SchemaType.JSON, batch = true)
        public void listenForData(List<User> users) {
            users.forEach((user) -> System.out.printf("***** LISTEN: %s%n".formatted(user)));
        }
    
        public record User(String userId, String firstName) {}
    }
    
    *** SENDING: [User[userId=1, firstName=AAAAA], User[userId=2, firstName=BBBB], User[userId=3, firstName=CCCCC], User[userId=4, firstName=DDDDD], User[userId=5, firstName=EEEEE]]
    ...
    ***** LISTEN: User[userId=1, firstName=AAAAA]
    ***** LISTEN: User[userId=2, firstName=BBBB]
    ***** LISTEN: User[userId=3, firstName=CCCCC]
    ***** LISTEN: User[userId=4, firstName=DDDDD]
    ***** LISTEN: User[userId=5, firstName=EEEEE]
    

    I hope this helps. Take care.