Search code examples
javaspringspring-bootspring-kafka

How do I Restart a shutdown embeddedKafkaServer in a Spring Unit Test?


I have a Spring-boot Unit Test that is testing Switch Back capabilities of my application when the primary Kafka Cluster comes online.

The application successfully switches to secondary when the primary goes offline. Now we're adding the ability to switch back to primary on a timer instead of failure.

My Test Method Looks like so:

   //Rochelle = Primary BootStrapServers
   //Hudson   = Secondary BootStrapServers


   @Test
   public void send_switchback() throws Exception
   {
      //Get ABSwitchCluster to check failover details
      KafkaSwitchCluster ktSwitch = (KafkaSwitchCluster)
              ((BootStrapExposerProducerFactory)
                       kafkaTemplate.getProducerFactory()).getBootStrapSupplier();

      assertThat(ktSwitch,             notNullValue());
      assertThat(ktSwitch.get(),       is(Rochelle));
      assertThat(ktSwitch.isPrimary(), is(true));

      assertThat(getBootStrapServersList(), is(Rochelle));

      log.info("Shutdown Broker to test Failover.");

      //Shutdown Primary Servers to simulate disconnection
      shutdownBroker_primary();
      //Allow for fail over to happen
      if ( ktSwitch.isPrimary() )
      {
         try
         {
            synchronized (lock)
            {  //pause to give Idle Event a chance to fire
               for (int i = 0; i <= timeOut && ktSwitch.isPrimary(); ++i)
               //while ( ktSwitch.isPrimary() )
               {  //poll for cluster switch
                  lock.wait(Duration.ofSeconds(15).toMillis());
               }
            }
         }
         catch (InterruptedException IGNORE)
         { fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
      }

      //Confirm Failover has happened
      assertThat(ktSwitch.get(),            is(Hudson));
      assertThat(ktSwitch.isPrimary(),      is(false));
      assertThat(getBootStrapServersList(), is(Hudson));

      assertThat(kafkaSwitchCluster.get(),       is(Hudson));
      assertThat(kafkaSwitchCluster.isPrimary(), is(false));

      //Send a message on backup server
      String message = "Test Failover";
      send(message);

      String msg = records.poll(10, TimeUnit.SECONDS);
      assertThat(msg, notNullValue());
      assertThat(msg, is(message));

      startup_primary();
      //embeddedKafkaRule.getEmbeddedKafka();

      assertThat(embeddedKafka.getBrokersAsString(), is(Rochelle));
      String brokers = embeddedKafka.getBrokersAsString();

      if ( !kafkaProducerErrorHandler.areBrokersUp(brokers) )
      {
         synchronized (lock)
         {
            for ( int i=0;
                  i <= 15 && !kafkaProducerErrorHandler.areBrokersUp(brokers)
                  && registry.isRunning();
                  ++i )
            { lock.wait(Duration.ofSeconds(1).toMillis()); }
         }
      }

      //TODO: test Scheduled Fire
      kafkaProducerErrorHandler.primarySwitch();

      if ( !kafkaSwitchCluster.isPrimary() )
      {
         try
         {
            synchronized (lock)
            {  //pause to give Idle Event a chance to fire
               for (int i = 0; i <= timeOut && !kafkaSwitchCluster.isPrimary(); ++i)
               //while ( !ktSwitch.isPrimary() )
               {  //poll for cluster switch
                  lock.wait(Duration.ofSeconds(15).toMillis());
               }
            }
         }
         catch (InterruptedException IGNORE)
         { fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
      }

      assertThat(brokers,              anyOf(is(Rochelle), is(Hudson))); //port didn't change
      assertThat(brokers,              is(Rochelle)); //is primary
      assertThat(kafkaSwitchCluster.isPrimary(), is(true));
      //assertThat(ktSwitch.isPrimary(), is(true));
      assertThat(ktSwitch.get(),       is(brokers));

      assertThat(kafkaProducerErrorHandler.areBrokersUp(brokers),  is(true));
      assertThat(kafkaProducerErrorHandler.areBrokersUp(Rochelle), is(true));

      assertThat(ktSwitch.isPrimary(), is(true));
      //assertThat(ktSwitch.get(),       not(anyOf(is(Hudson), is(Rochelle))));
      assertThat(ktSwitch.get(),       is(embeddedKafka.getBrokersAsString()));

      //Send a message on backup server
      message = "Test newPrimary";
      send(message);

      msg = records.poll(10, TimeUnit.SECONDS);
      assertThat(msg, notNullValue());
      assertThat(msg, is(message));

      log.info("Test is finished");
   }

I'm using this method to shutdown my Primary Embedded Kafka

   public void shutdownBroker_primary()
   {
      for(KafkaServer ks : embeddedKafka.getKafkaServers())
      { ks.shutdown(); }
      for(KafkaServer ks : embeddedKafka.getKafkaServers())
      { ks.awaitShutdown(); }
   }

I'm using this to restart Kafka:

public void startup_primary()
   {
      //registry.stop();
      //kafkaSwitchCluster.Rochelle = embeddedKafka.getBrokersAsString();
      for(KafkaServer ks : embeddedKafka.getKafkaServers()) { ks.startup(); }
      registry.start();
   }

primarySwitch() is a Scheduled event to switch the cluster back to primary. It is Directly called in test. It's a wrapper around the same code that switches the in-use cluster when Kafka goes down.

How do I get the Primary Embedded Kafka Cluster to successfully start after I shut it down so I can prove that the application can successfully move back to the primary cluster once It's available again?


UPDATE:
I have created Code Example on Github with what I have so far: https://github.com/raystorm/Kafka-Example .


UPDATE: 2 the Linked Repository Above has been updated based on the accepted answer below, and now all tests pass.


Solution

  • It wasn't really designed for this use case, but the following works, as long as you don't need to retain data between the broker instances...

    @SpringBootTest
    @EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class So64145670ApplicationTests {
    
        @Autowired
        private EmbeddedKafkaBroker broker;
    
        @Test
        void restartBroker(@Autowired KafkaTemplate<String, String> template) throws Exception {
            SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
            this.broker.destroy();
            // restart
            this.broker.afterPropertiesSet();
            sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
        }
    
    }
    

    EDIT

    Here's one with two brokers...

    @SpringBootTest(classes = { So64145670Application.class, So64145670ApplicationTests.Config.class })
    @EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class So64145670ApplicationTests {
    
        @Autowired
        private EmbeddedKafkaBroker embeddedKafka;
    
        @Autowired
        private EmbeddedKafkaBroker secondBroker;
    
        @Test
        void restartBroker(@Autowired KafkaTemplate<String, String> template,
                @Autowired ProducerFactory<String, String> pf) throws Exception {
    
            SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
            KafkaTemplate<String, String> secondTemplate = new KafkaTemplate<>(pf,
                    Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.secondBroker.getBrokersAsString()));
            sendResult = secondTemplate.send("so64145670-1", "foo").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
            this.embeddedKafka.destroy();
            this.secondBroker.destroy();
            // restart
            this.embeddedKafka.afterPropertiesSet();
            this.secondBroker.afterPropertiesSet();
            sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
            sendResult = secondTemplate.send("so64145670-1", "bar").get(10, TimeUnit.SECONDS);
            System.out.println("+++" + sendResult.getRecordMetadata());
        }
    
        @Configuration
        public static class Config {
    
            @Bean
            EmbeddedKafkaBroker secondBroker() {
                return new EmbeddedKafkaBroker(1, true, "so64145670-1")
                        .brokerListProperty("spring.kafka.second.server");
            }
    
        }
    
    }
    
    +++so64145670-1@0
    +++so64145670-1-0@0
    +++so64145670-1@0
    +++so64145670-1-0@0