Search code examples
javaspring-bootjdbchikaricpvirtual-threads

Oh no! IO! Concurrency issues with virtual threads, HikariCP, and Spring Boot JDBC (Postgres)


I have run into an issue when using the following technologies.

  • PostgreSQL run through a docker container
  • Project Loom
  • Spring Boot 3.3.4
  • Java 21
  • Spring Boot Starter JDBC (which includes HikariCp - who I believe to be the source of the issue)
  • Spring Boot Starter JPA

And obviously, a litany of other things. I can provide a POM on request, but I don't think it'll be helpful. My postgres driver is UTD, as is all my versions.

The core issue is when running concurrent operations at high speeds, I am running into IO issues with HikariCp, when trying to save items to my database. I'm not totally sure what to do.

In my microservice's logs:

2024-09-19T18:50:51.617-04:00  WARN 10060 --- [virtual-ingest] [    virtual-342] com.zaxxer.hikari.pool.ProxyConnection   : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.

**In Docker Logs for PG (repeated, constantly): **

2024-09-19 22:50:51.807 UTC [412] LOG:  unexpected EOF on client connection with an open transaction

My code (I haven't gone back to clean this up yet):

@EventListener(ApplicationReadyEvent.class)
    public void startStructures()
    {
        try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
        {
            sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
                processQueueMessages(queue);
                return null;
            }));
            outerScope.join();
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
            log.error("Processing interrupted", e);
        }
    }

    private void processQueueMessages(String queue)
    {
        while (true)
        {
            try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
            {
                List<Message> messages = retrieveMessages(queue);
                if (messages.isEmpty())
                {
                    log.info("No messages in queue: {}, sleeping...", queue);
                    Thread.sleep(5000);
                }
                else
                {
                    for (Message message : messages)
                    {
                        innerScope.fork(() -> {
                            processMessage(queue, message);
                            return null;
                        });
                    }
                    innerScope.join();
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
                log.error("Queue processing interrupted", e);
                break;
            }
        }
    }

    public List<Message> retrieveMessages(String queue)
    {
        var url = awsConfig.getBaseUrl() + queue;
        ReceiveMessageRequest request = ReceiveMessageRequest.builder()
                                                             .queueUrl(url)
                                                             .maxNumberOfMessages(10)
                                                             .waitTimeSeconds(
                                                                 10)
                                                             .build();

        ReceiveMessageResponse response = sqsClient.receiveMessage(request);
        return response.messages();
    }

    private void processMessage(String queue, Message message) throws JsonProcessingException
    {

        log.info("Processing message from queue {}: {}", queue, message.messageId());

        JsonNode node = nodeBuilderService.buildNode(message);
        distributionService.handleSqsNotification(node);
        deleteMessage(queue, message);
    }

    private void deleteMessage(String queue, Message message)
    {
        var url = awsConfig.getBaseUrl() + queue;
        sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));

        int remainingMessages = getMessageCount(queue);
        log.info(
            "Deleted message: {}. Approximate {} messages remaining in the queue.", message.messageId(),
            remainingMessages
        );
    }

    public int getMessageCount(String queueName)
    {
        var url = awsConfig.getBaseUrl() + queueName;

        GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
                                                                     .queueUrl(url)
                                                                     .attributeNames(
                                                                         QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
                                                                     .build();

        return Integer.parseInt(
            sqsClient.getQueueAttributes(request)
                     .attributes()
                     .get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
        );
    }

In the processing of messages, my code eventually hits this method:

@Transactional
    public void process(String filename, InputStream stream)
    throws IOException
    {
        byte[] bytes = getBytesFromInputStream(stream, filename);

        List<SurfaceObservation> observations = decoder.beginSynopticDecoders(bytes, filename);
        repository.saveAllAndFlush(observations);
    }

Which is where the error occurs.

I have tried forcing this onto a platform thread, but I believe this is related to HikariCP's lack of support for virtual threads, even in current versions. I watched JEP cafe on my scopes—and I'm wondering if anyone can help me answer a few questions.

  1. Am I doing anything blatantly wrong? I'm migrating from async patterns, and want to never use them again, so there's a good chance I don't know what I should be doing.
  2. I am wondering if ExtentLocal can help. I think the problem in Hikari might be related to their use of ThreadLocal. but again, new to this stuff.
  3. Do I have to ditch spring jdbc? What would I use instead? R2DBC as of (I'm pretty sure) writing this still does not have a good ORM system, even with Hibernate reactive.

I tried Platform threads, I tried a scope for just the DB transactions. I tried a lot of stuff for hours and have that code-brain where now I'm not even sure what I tried. I even tried to have chatGPT help me, and as expected, it could not.


Solution

  • I am marking this as answered as I am no longer experiencing the problem after implementing a Redis cache, but I do believe Hikari has issues in handling virtual-thread utilizing, highly concurrent applications and will watch to see if any better solution comes up.