Search code examples
apache-kafkalagom

Unable to override lagom kafka parameters


I created a normal java project and put all dependencies of lagom kafka client on classpath , then in source folder i put the application.conf

Content of application.conf

lagom.broker.kafka {
  service-name = ""

  brokers = "127.0.0.1:9092"

}

while running the application service-name = "" should be used (so that my broker path could be used, rather than discovering), but it was not working

while debugging i found that in KafkaConfig class service-name comes out to be "kafka_native".

I found that while creating KafkaConfig , conf object which is coming dosen't have my application.conf in its origin

After this i tried overriding them using vm parameters like this:

-Dlagom.broker.kafka.service-name=""
-Dlagom.broker.kafka.brokers="127.0.0.1:9092"
-Dakka.kafka.consumer.kafka-clients.auto.offset.reset="earliest"

and it worked.

Can somebody explain why overriding in application conf not working

This is how i am subscribing to topic

import java.net.URI;
import java.util.concurrent.CompletableFuture;

import com.ameyo.ticketing.ticket.api.TicketingService;
import com.ameyo.ticketing.ticket.api.events.TicketEvent;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import com.lightbend.lagom.javadsl.client.integration.LagomClientFactory;
import com.typesafe.config.ConfigFactory;

import akka.Done;
import akka.stream.javadsl.Flow;

/**
 *
 */

public class Main {
    public static void main(String[] args) {

        String brokers = ConfigFactory.load().getString("lagom.broker.kafka.brokers");
        System.out.println("Initial Value for Brokers " + brokers);
        LagomClientFactory clientFactory = LagomClientFactory.create("legacy-system", Main.class.getClassLoader());
        TicketingService ticketTingService = clientFactory.createClient(TicketingService.class,
                URI.create("http://localhost:11000"));

        Topic<TicketEvent> ticketEvents = ticketTingService.ticketEvents();

        ticketEvents.subscribe().withGroupId("nya13").atLeastOnce(Flow.<TicketEvent> create().mapAsync(1, e -> {
            System.out.println("kuch to aaya");
            return CompletableFuture.completedFuture(Done.getInstance());
        }));

        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e1) {

        }
    }
}

Solution

  • Change configuration to

    akka{ 
        lagom.broker.kafka { 
            service-name = "" 
            brokers = "127.0.0.1:9092" 
        }
    }
    

    and it worked