Search code examples
javaspring-bootspring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-function

spring cloud function doesn't register multiple function definitions


I've written a function for transforming object into message

@Bean
public Function<FabricPayload<T>, Message<FabricPayload<T>>> publish() {

    return payload -> {
        if (payload.getContent().getClass().isPrimitive())
            throw new IllegalArgumentException("Event emitter content should be business object or domain entity");

        String kafkaTopic = String.join(":", category, app, runtimeEnv);
        Message<FabricPayload<T>> message = MessageBuilder.withPayload(payload)
                .setHeader(fabricAuthHeader, String.join(":", event.getRealmId(), event.getAccountId(), event.getUserId()))
                .setHeader(KafkaHeaders.MESSAGE_KEY, String.join(":", event.getRealmId(), event.getAccountId()))
                .setHeader(KafkaHeaders.TOPIC, kafkaTopic).build();
        log.debug("Publishing message {}", message);
        return message;
    };
}

this function is present in the com.foo.bar.fabric.nh44.functions.producer package, similarly I have a consumer for Kafka messages in the com.foo.bar.fabric.nh44.functions.consumer package. The idea is to bundle this code in to a JAR library (event-library) and then use it across all our microservices for event driven communication.

I wrote a test to document using these components in microservices by writing tests under the com.foo.bar.example package in the src/test/resources directory of event-library code.

The test passes only if add both @ComponentScan(basePackages = {"com.foo.bar"}) and spring.cloud.function.scan.packages

@SpringBootApplication
@ComponentScan(basePackages = {"com.foo.bar"})
class ExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }
}

And then the test

@Slf4j
@SpringBootTest
@EmbeddedKafka(
        brokerProperties = {"listeners=PLAINTEXT://localhost:11092"},
        topics = {"fabric-piglet-dev", "alm-nimble-dev"},
        partitions = 1
)
@TestPropertySource(properties = {
        "foo.bar.fabric.nh44.category=fabric",
        "runtime-env=dev",
        "spring.application.name=piglet",
        "spring.cloud.function.scan.packages=com.foo.bar.fabric.nh44.functions.consumer,com.foo.bar.fabric.nh44.functions.producer",
        "spring.cloud.function.definition=publish;consume",
        "spring.cloud.stream.default.contentType=application/json",
        "spring.cloud.stream.kafka.binder.autoCreateTopics=false",
        "spring.cloud.stream.kafka.binder.brokers=localhost:11092",
        "spring.cloud.stream.kafka.default.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer",
        "spring.cloud.stream.kafka.default.producer.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer",
        "spring.cloud.stream.kafka.default.producer.configuration.max.block.ms=100",
        "spring.cloud.stream.kafka.default.consumer.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
        "spring.cloud.stream.kafka.default.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
        "spring.cloud.stream.bindings.publish-out-0.destination=${foo.bar.fabric.nh44.category}-${spring.application.name}-${runtime-env}",
        "spring.cloud.stream.bindings.consume-in-0.group=${foo.bar.fabric.nh44.category}-${spring.application.name}-consumer-${runtime-env}",
        "spring.cloud.stream.bindings.consume-in-0.destination=alm-nimble-dev",
        "foo.bar.fabric.auth-header=X-FABRIC-CORRELATION"
})
class ExampleApplicationTests {
    
    // fields and autowiring

    @Test
    @DisplayName("SHOULD publish to kafka after entity is saved")
    void testPublishesLifeCycleEventsToKafka() {
        Map<String, Object> configs = new HashMap<>(
                KafkaTestUtils.consumerProps("consumer", "false", kafkaBroker));
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        Consumer<String, Object> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), jsonDeserializer).createConsumer();
        consumer.subscribe(singleton("fabric-piglet-dev"));
        ExampleEntity entity = // build entity

        FabricEvent event = //build event

        streamBridge.send("publish-in-0", "kafka", FabricPayload.builder().event(event).content(entity).build(), MimeType.valueOf("application/json"));
        ConsumerRecord<String, Object> received = KafkaTestUtils.getSingleRecord(consumer, "fabric-piglet-dev", 2000);
        Assertions.assertNotNull(received, "Verifying record received");
        Assertions.assertEquals("r1:a1:u1", new String(received.headers().lastHeader(fabricAuthHeader).value(), StandardCharsets.UTF_8));
        consumer.close();
    }

Shouldn't spring.cloud.function.scan.packages be enough to register the functions as beans?


Solution

  • In application.yml file declare functions.

    spring:
     application:
      name: app-name
     cloud:
      function:
       definition: test1Func;test2Func