Search code examples
google-cloud-pubsubmicronautgoogle-cloud-pubsub-emulator

Micronaut 3: How to use PubSubEmulatorContainer


Update: Link to repo is moved to answer because repo is now updated with code from answer below.

Problem description

Current code is working, but it is using gcloud beta emulators pubsub from google/cloud-sdk for integration tests.

  • Integration tests are slow due to the size of the google/cloud-sdk image
  • pubsub emulator has to run on a fixed port, there seems to be no way to tell Micronaut which port the emulator is running on

I'll need to set the following environment variable in maven-surefire-plugin.

<environmentVariables>
    <PUBSUB_EMULATOR_HOST>localhost:8085</PUBSUB_EMULATOR_HOST>
</environmentVariables>

How this can be done in Spring Boot

According to Test Containers | Gcloud Module, the correct way of implementing integration tests with PubSubEmulatorContainer in Spring Boot is like this: https://github.com/saturnism/testcontainers-gcloud-examples/blob/main/springboot/pubsub-example/src/test/java/com/example/springboot/pubsub/PubSubIntegrationTests.java

This will bring up the container on a random port, and that is possible because of DynamicPropertyRegistry in Spring. It seems that Micronaut is missing this possibility.

Doc: https://www.testcontainers.org/modules/gcloud/


I'm looking for a working example of a JUnit5 or Spock integration test implemented in Micronaut 3.x that is using PubSubEmulatorContainer like described in the above doc.

Related doc: https://micronaut-projects.github.io/micronaut-gcp/latest/guide/#emulator


There are some comments on GitHub around configuring TransportChannelProvider. I'm able to inject an instance and inspect it, but I still haven't found out exactly what to do.

These are the closest leads so far: https://github.com/micronaut-projects/micronaut-gcp/issues/257 https://github.com/micronaut-projects/micronaut-gcp/pull/259


Solution

  • Update 2023-07-23 Bumped to Micronaut 4.0.1, refactored away Lombok in Java-demo. Link: pubsub-emulator-demo repo

    Update 2023-05-01 Updated pubsub-emulator-demo repo with Kotlin/Kotest-example.

    TL;DR

    We'll need to start the testcontainer first, get emulator host address and then call ApplicationContext.run like this:

    applicationContext = ApplicationContext.run(               
    ["pubsub.emulator.host": emulatorHost])
    

    Small Github repo with example code: https://github.com/roar-skinderviken/pubsub-emulator-demo

    Long answer with code

    I finally managed to make a working solution using Micronaut 3.0.2 and Spock. A related Micronaut PR got me on track, together with this article: Micronaut Testing Best Practices https://objectcomputing.com/files/9815/9259/7089/slide_deck_Micronaut_Testing_Best_Practices_webinar.pdf

    First a PubSubEmulator class (Groovy)

    package no.myproject.testframework.testcontainers
    
    import org.testcontainers.containers.PubSubEmulatorContainer
    import org.testcontainers.utility.DockerImageName
    
    class PubSubEmulator {
        static PubSubEmulatorContainer pubSubEmulatorContainer
    
        static init() {
            if (pubSubEmulatorContainer == null) {
                pubSubEmulatorContainer = new PubSubEmulatorContainer(
                        DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"))
                pubSubEmulatorContainer.start()
            }
        }
    }
    

    Then a fixture for PubSubEmulator (Groovy)

    package no.myproject.testframework.testcontainers
    
    trait PubSubEmulatorFixture {
        Map<String, Object> getPubSubConfiguration() {
            if (PubSubEmulator.pubSubEmulatorContainer == null || !PubSubEmulator.pubSubEmulatorContainer.isRunning()) {
                PubSubEmulator.init()
            }
            [
                    "pubsub.emulator-host": PubSubEmulator.pubSubEmulatorContainer.getEmulatorEndpoint()
            ]
        }
    }
    

    Then a specification class (Groovy) that starts the container, creates a topic and a subscription.

    The clue here is to pass in pubsub.emulator.host as part of the configuration when calling ApplicationContext.run.

    Remaining part of the code is very similar to the Spring Boot example I linked to in my question.

    package no.myproject.testframework
    
    import com.google.api.gax.core.NoCredentialsProvider
    import com.google.api.gax.grpc.GrpcTransportChannel
    import com.google.api.gax.rpc.FixedTransportChannelProvider
    import com.google.cloud.pubsub.v1.SubscriptionAdminClient
    import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
    import com.google.cloud.pubsub.v1.TopicAdminClient
    import com.google.cloud.pubsub.v1.TopicAdminSettings
    import com.google.pubsub.v1.ProjectSubscriptionName
    import com.google.pubsub.v1.PushConfig
    import com.google.pubsub.v1.TopicName
    import io.grpc.ManagedChannelBuilder
    import io.micronaut.context.ApplicationContext
    import no.myproject.configuration.GcpConfigProperties
    import no.myproject.configuration.PubSubConfigProperties
    import no.myproject.testframework.testcontainers.PubSubEmulatorFixture
    import spock.lang.AutoCleanup
    import spock.lang.Shared
    import spock.lang.Specification
    
    abstract class PubSubSpecification extends Specification
            implements PubSubEmulatorFixture, EnvironmentFixture {
    
        @AutoCleanup
        @Shared
        EmbeddedServer embeddedServer
    
        @AutoCleanup
        @Shared
        ApplicationContext applicationContext
    
        def setupSpec() {
    
            // start the pubsub emulator
            def emulatorHost = getPubSubConfiguration().get("pubsub.emulator-host")
    
            // start a temporary applicationContext in order to read config
            // keep any pubsub subscriptions out of context at this stage
            applicationContext = ApplicationContext.run()
    
            def gcpConfigProperties = applicationContext.getBean(GcpConfigProperties)
            def pubSubConfigProperties = applicationContext.getBean(PubSubConfigProperties)
    
            def channel = ManagedChannelBuilder.forTarget("dns:///" + emulatorHost)
                    .usePlaintext()
                    .build()
    
            def channelProvider =
                    FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
    
            // START creating topic
    
            def topicAdminClient =
                    TopicAdminClient.create(
                            TopicAdminSettings.newBuilder()
                                    .setCredentialsProvider(NoCredentialsProvider.create())
                                    .setTransportChannelProvider(channelProvider)
                                    .build())
    
            def topic = TopicName.of(
                    gcpConfigProperties.getProjectId(),
                    pubSubConfigProperties.getTopicName())
    
            try {
                topicAdminClient.createTopic(topic)
            } catch (AlreadyExistsException) {
                // this is fine, already created
                topicAdminClient.getTopic(topic)
            }
    
            // START creating subscription
    
            pubSubConfigProperties.getSubscriptionNames().forEach(it -> {
                def subscription =
                        ProjectSubscriptionName.of(gcpConfigProperties.getProjectId(), it)
    
                def subscriptionAdminClient =
                        SubscriptionAdminClient.create(
                                SubscriptionAdminSettings.newBuilder()
                                        .setTransportChannelProvider(channelProvider)
                                        .setCredentialsProvider(NoCredentialsProvider.create())
                                        .build())
    
                try {
                    subscriptionAdminClient
                            .createSubscription(
                                    subscription,
                                    topic,
                                    PushConfig.getDefaultInstance(),
                                    100)
    
                    System.out.println("Subscription created " + subscriptionAdminClient.getSubscription(subscription))
                } catch (AlreadyExistsException) {
                    // this is fine, already created
                    subscriptionAdminClient.getSubscription(subscription)
                }
            })
    
            channel.shutdown()
    
            // stop the temporary applicationContext
            applicationContext.stop()
    
            // start the actual applicationContext
            embeddedServer = ApplicationContext.run(
                    EmbeddedServer,
                    [
                            'spec.name'           : "PubSubEmulatorSpec",
                            "pubsub.emulator.host": emulatorHost
                    ],
                    environments)
    
            applicationContext = embeddedServer.applicationContext
        }
    }
    

    Then a factory class (Groovy) for mocking credentials

    package no.myproject.pubsub
    
    import com.google.auth.oauth2.AccessToken
    import com.google.auth.oauth2.GoogleCredentials
    import io.micronaut.context.annotation.Factory
    import io.micronaut.context.annotation.Replaces
    import io.micronaut.context.annotation.Requires
    
    import javax.inject.Singleton
    
    
    @Factory
    @Requires(property = 'spec.name', value = 'PubSubEmulatorSpec')
    class EmptyCredentialsFactory {
    
        @Singleton
        @Replaces(GoogleCredentials)
        GoogleCredentials mockCredentials() {
            return GoogleCredentials.create(new AccessToken("", new Date()))
        }
    }
    

    And finally, a Spock test spec.

    package no.myproject.pubsub
    
    import no.myproject.testframework.PubSubSpecification
    
    import java.util.stream.IntStream
    
    class PubSubIntegrationSpec extends PubSubSpecification {
    
        def NUMBER_OF_MESSAGES_IN_TEST = 5
        def DELAY_IN_MILLISECONDS_PER_MSG = 100
    
        def "when a number of messages is sent, same amount of messages is received"() {
            given:
            def documentPublisher = applicationContext.getBean(DocumentPublisher)
            def listener = applicationContext.getBean(IncomingDocListenerWithAck)
            def initialReceiveCount = listener.getReceiveCount()
    
            when:
            IntStream.rangeClosed(1, NUMBER_OF_MESSAGES_IN_TEST)
                    .forEach(it -> documentPublisher.send("Hello World!"))
    
            // wait a bit in order to let all messages propagate through the queue
            Thread.sleep(NUMBER_OF_MESSAGES_IN_TEST * DELAY_IN_MILLISECONDS_PER_MSG)
    
            then:
            NUMBER_OF_MESSAGES_IN_TEST == listener.getReceiveCount() - initialReceiveCount
        }
    }