Search code examples
quarkusidempotentcamel-ftpcamel-sqlapache-camel-cdi

Camel Quarkus SFTP Idempotent Consumer "this.transactionTemplate is null" error


I've been developping a Quarkus app with STFP routes, in which I would like to incorporate IdempotentConsumers. The end goal is to deploy multiple instances in my cluster and achieve Distributed Locking.

My setup is as follows:

<!--pom.xml-->

...
  <properties>
    <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
    <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
    <quarkus.platform.version>3.2.10.Final</quarkus.platform.version>

    <compiler-plugin.version>3.11.0</compiler-plugin.version>
    <maven.compiler.release>17</maven.compiler.release>
    <maven.compiler.target>17</maven.compiler.target>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget>
    <maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <groovy-maven-plugin.version>2.1.1</groovy-maven-plugin.version>
    <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
    <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version>

    <skipITs>true</skipITs>
    <surefire-plugin.version>3.0.0</surefire-plugin.version>

    <version.lombok>1.18.30</version.lombok>
    <version.ermis.commons>1.6.0</version.ermis.commons>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>${quarkus.platform.group-id}</groupId>
        <artifactId>${quarkus.platform.artifact-id}</artifactId>
        <version>${quarkus.platform.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
      <dependency>
        <groupId>io.quarkus.platform</groupId>
        <artifactId>quarkus-camel-bom</artifactId>
        <version>${quarkus.platform.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
      <groupId>jakarta.persistence</groupId>
      <artifactId>jakarta.persistence-api</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-hibernate-orm</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jdbc-postgresql</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-liquibase</artifactId>
    </dependency>
    <!--    Camel Quarkus Core  -->
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-bean</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-ftp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jdbc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-sql</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-xpath</artifactId>
    </dependency>
    <!--    Rest and JSON   -->
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-jackson</artifactId>
    </dependency>
    <!--    Utility -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>${version.lombok}</version>
      <scope>provided</scope>
    </dependency>
    <!--  Smallrye Health  -->
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-health</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-junit5</artifactId>
      <scope>test</scope>
    </dependency>
...

Following by the attempted configuration for the idempotence repository based on the JdbcCachedMessageIdRepository as found here: Using Jdbc Based Idempotent Repository

package com.epav.configuration;

import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.camel.Configuration;
import org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.sql.DataSource;


@Configuration
public class FtpIdempotentRepositoryConfiguration {

    @ConfigProperty(name = "trader.integration.instance.name")
    String instanceName;

    @Singleton
    DataSource dataSource;

    @Singleton
    @Named(value = "FtpIdempotentRepository")
    public JdbcCachedMessageIdRepository jdbcCachedMessageRepository(){
        JdbcCachedMessageIdRepository repository = new JdbcCachedMessageIdRepository();
        repository.setDataSource(dataSource);
        repository.setProcessorName(instanceName);
        return repository;
    }
}

The Route in which I am trying to use the above repository:


import com.intrasoft.configuration.properties.FtpRoutingProperties;
import jakarta.inject.Singleton;
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository;

@Singleton
@RequiredArgsConstructor
public class IncomingRoute extends RouteBuilder {

    private final FtpRoutingProperties ftpRoutingProperties;

    private final JdbcCachedMessageIdRepository jdbcCachedMessageIdRepository;

    @Override
    public void configure() {
        String sftpUrl = String.format("sftp://%s:%s@%s:",
                ftpRoutingProperties.username,
                ftpRoutingProperties.password,
                ftpRoutingProperties.host);

        from(sftpUrl + ftpRoutingProperties.edbInputDir)
                .idempotentConsumer(header("CamelFileAbsolutePath"))
                .idempotentRepository(jdbcCachedMessageIdRepository)
                .completionEager(true)
                .process(CUSTOM_PROCESSOR_NAME)
                .to(sftpUrl + ftpRoutingProperties.successDir)
                .stop();

Finally the application properties used are the following:

quarkus.banner.enabled=false
quarkus.application.name=custom-app-name
trader.integration.instance.name=${pod.ip:${quarkus.application.name}}

quarkus.http.port=9090

quarkus.log.level=INFO
quarkus.log.console.format=%d{HH:mm:ss} %-5p [%c] (%t) %s%e%n

quarkus.devservices.enabled=false
%dev.quarkus.log.category."org.apache.camel".level=ERROR

%dev.quarkus.datasource.db-kind=postgresql
%dev.quarkus.datasource.username=idemp_db
%dev.quarkus.datasource.password=idemp_db
%dev.quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/idemp_db
%dev.quarkus.hibernate-orm.log.sql=false

quarkus.liquibase.migrate-at-start=true
quarkus.liquibase.change-log=db/master.xml
...

And arriving at the crux of the subject. During initialization after the liquibase migration has completed I am met with the following error:

16:10:03 INFO  [liquibase.database] (Quarkus Main Thread) Set default schema name to public
 16:10:03 INFO  [liquibase.executor] (Quarkus Main Thread) Changelog query completed.
16:10:03 INFO  [liquibase.executor] (Quarkus Main Thread) Changelog query completed.
16:10:03 INFO  [liquibase.executor] (Quarkus Main Thread) Changelog query completed.
16:10:03 INFO  [liquibase.lockservice] (Quarkus Main Thread) Successfully acquired change log lock
16:10:04 INFO  [liquibase.changelog] (Quarkus Main Thread) Reading from public.databasechangelog
16:10:04 INFO  [liquibase.executor] (Quarkus Main Thread) Changelog query completed.
Database is up to date, no changesets to execute
16:10:04 INFO  [liquibase.executor] (Quarkus Main Thread) Changelog query completed.
16:10:04 INFO  [liquibase.lockservice] (Quarkus Main Thread) Successfully released change log lock
16:10:07 ERROR [org.apache.camel.impl.engine.AbstractCamelContext] (Quarkus Main Thread) Error starting CamelContext (camel-1) due to exception thrown: Failed to start route sftp-incoming-edb-route because of java.lang.NullPointerException: Cannot invoke "org.springframework.transaction.support.TransactionTemplate.execute(org.springframework.transaction.support.TransactionCallback)" because "this.transactionTemplate" is null: org.apache.camel.FailedToStartRouteException: Failed to start route sftp-incoming-route because of java.lang.NullPointerException: Cannot invoke "org.springframework.transaction.support.TransactionTemplate.execute(org.springframework.transaction.support.TransactionCallback)" because "this.transactionTemplate" is null
    at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:122)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:306)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:189)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2827)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2509)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2464)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:113)
    at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2100)
    at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:313)
    at org.apache.camel.quarkus.main.CamelMain.doStart(CamelMain.java:94)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:113)
    at org.apache.camel.quarkus.main.CamelMain.startEngine(CamelMain.java:140)
    at org.apache.camel.quarkus.main.CamelMainRuntime.start(CamelMainRuntime.java:49)
    at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
    at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot173480958.deploy_0(Unknown Source)
    at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot173480958.deploy(Unknown Source)
    at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
    at io.quarkus.runtime.Application.start(Application.java:101)
    at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
    at io.quarkus.runner.GeneratedMain.main(Unknown Source)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
    at java.base/java.lang.Thread.run(Thread.java:840)

Since the database is up, running and available, the connection as well, as indicated from the liquibase logs, I am leaning towards underlying conflicting dependencies, or the camel-quarkus-sql : javax.sql.DataSource does not initialize properly.

Note: I am using hibernate as well for some of my entity classes and the entityManagers injected are all resolving and working properly.

Any help will be greatly appreciated and if anyone has any tips on the distributed locking approach as well, it would be of great importance!

Looking for a solution without an external service / extra pods ( or master "routing" pod ).


Solution

  • The problem is that you use the no-args constructor of JdbcCachedMessageIdRepository, which does not set up the TransactionTemplate. It should work if you change things as follows.

    JdbcCachedMessageIdRepository repository = new JdbcCachedMessageIdRepository(dataSource, instanceName);