Search code examples
javaspring-bootapache-camelminio

Unable to retrieve MinIO Object through Apache camel


I'm facing a problem retrieving an Object from MinIO Server through Apache Camel .

I'm using a "third party" library (that I cannot change directly) which use the following approach to connect to camel and download objects:

ConsumerTemplate template = context.createConsumerTemplate();
byte[] content = template.receiveBody(uri, timeout, byte[].class);

To this code i pass my "camel flavored" uri for MinIO with the following format:

   String camelUri = "minio://myBucketName?prefix=hello.txt");
 

I'm configuring the Apache Camel component like this for MinIO:

@Bean
    public MinioClient minioClient() {
        return new MinioClient.Builder()
                .credentials(accessKey, secretKey)
                .endpoint(url)
                .build();
    }
    

@Bean
public CamelContext camelContext(MinioClient client) {
    CamelContext context = new DefaultCamelContext();
    context.setTracing(true);
    context.start();
    
    MinioComponentBuilder minioCompBuilder = 
    ComponentsBuilderFactory.minio().minioClient(client).secure(true);
    minioCompBuilder.register(context, "minio");
    return context;
}

Enabling the TRACE level I can see that camel is able to establish a connection, by first verifing that the bucket already exists,but nothing is returned.

Following the configuration options to be passed as query string I did try as well the option :

String camelUri = "minio://myBucketName?objectName=hello.txt;

Still nothing is returned.

In the log :

"message":"Starting service: minio://myBucketName?prefix=hello.txt&startScheduler=false"    "message":"Querying whether bucket myBucketName already exists..."  "message":"Bucket myBucketName already exists"  "message":"Started service: minio://myBucketName?prefix=hello.txt&startScheduler=false"     "message":"<<<< minio://myBucketName?prefix=hello.txt&startScheduler=false"     "message":"Creating service from endpoint: minio://myBucketName?prefix=hello.txt&startScheduler=false"  "message":"Creating EventDrivenPollingConsumer with queueSize: 1000 blockWhenFull: true blockTimeout: 0 copy: false"    "message":"Building service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Built service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Initializing service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Building service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Build consumer: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Building service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Warming up PrototypeExchangeFactory loaded class: org.apache.camel.support.DefaultExchange"
"message":"Built service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Warming up DefaultConsumer loaded class: org.apache.camel.support.DefaultConsumer$DefaultConsumerCallback"
"message":"Built service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Initializing service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Init consumer: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Initializing service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Initialized service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Initialized service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Initialized service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Starting service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Started service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Acquired service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Before poll minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Starting service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Starting consumer: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Starting service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Started service: org.apache.camel.impl.engine.PrototypeExchangeFactory@427e563c"
"message":"Service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false already started"
"message":"Building service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Built service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Initializing service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Initialized service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Starting service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Created new ScheduledThreadPool for source: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false] with name: minio://myBucketName?prefix=hello.txt&startScheduler=false -> org.apache.camel.util.concurrent.SizedScheduledExecutorService@1eef6e57[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Started service: org.apache.camel.support.DefaultScheduledPollConsumerScheduler@1c0e57e4"
"message":"Scheduling 1 consumers poll (fixed delay) with initialDelay: 1000, delay: 500 (milliseconds) for: minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"canScheduleOrExecute 0 < 1000 -> true"
"message":"Created thread[Camel (camel-1) thread #1 - minio://myBucketName] -> Thread[Camel (camel-1) thread #1 - minio://myBucketName,5,main]"
"message":"Started service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"After poll minio://myBucketName?prefix=hello.txt&startScheduler=false"
"message":"Suspending service MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Suspending service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Suspended service: MinioConsumer[minio://myBucketName?prefix=hello.txt&startScheduler=false]"
"message":"Released service: PollingConsumer on minio://myBucketName?prefix=hello.txt&startScheduler=false"
,"message":"Scheduled task started on:   minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Cannot start to poll: minio://myBucketName?prefix=hello.txt&startScheduler=false as its suspended"}
,"message":"Scheduled task completed on: minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Scheduled task started on:   minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Cannot start to poll: minio://myBucketName?prefix=hello.txt&startScheduler=false as its suspended"}
,"message":"Scheduled task completed on: minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Scheduled task started on:   minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Cannot start to poll: minio://myBucketName?prefix=hello.txt&startScheduler=false as its suspended"}
,"message":"Scheduled task completed on: minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Scheduled task started on:   minio://myBucketName?prefix=hello.txt&startScheduler=false"}
,"message":"Cannot start to poll: minio://myBucketName?prefix=hello.txt&startScheduler=false as its suspended"}
..... and goes on with this util i stop the application. 

I'm new to camel and I don't understand why it creates as well a PollingConsumer and the scheduler (which I did try to stop passing startScheduler=false) by default, and the polling after is continuously trying to start but fails. Probably this should ends in another question, I don't think is related to my problem.

My dependencies :

<dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-minio</artifactId>
        <version>3.14.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-componentdsl</artifactId>
        <version>3.14.4</version>
    </dependency>
    <dependency>
        <groupId>io.minio</groupId>
        <artifactId>minio</artifactId>
        <version>8.4.1</version>
    </dependency>

In the MinIo Server the file is right under the Bucket: myBucketName/hello.txt

Moreover I tested with direct calls using MinioClient (the same injected into MinioComponentBuilder):

 GetObjectResponse getObj =  minioClient.getObject(GetObjectArgs.builder()
                    .bucket("myBucketName")
                    .object("hello.txt").build());
 String input = new String(getObj.readAllBytes());
 log.info("TXT CONTENT: {}",input);

        

And it works just fine, printing the content of the txt.

I'm probably doing something wrong with the uri syntax and how I compose it but cannot figure it out.


Solution

  • I'm working with Stefano, the issue is related to camel implementation with minio component. The solution is the following. The recevivedBody is an interface with overloaded methods if the timeout is required set the minimum timeout to 1100 millis or the result will be null, if is not required just use the method without timeout.

    With Timeout

    ConsumerTemplate template = context.createConsumerTemplate();
    byte[] content = template.receiveBody(uri, 1100, byte[].class);
    

    Without timeOut

    ConsumerTemplate template = context.createConsumerTemplate();
    byte[] content = template.receiveBody(uri, byte[].class);