I've written a function for transforming object into message
@Bean
public Function<FabricPayload<T>, Message<FabricPayload<T>>> publish() {
return payload -> {
if (payload.getContent().getClass().isPrimitive())
throw new IllegalArgumentException("Event emitter content should be business object or domain entity");
String kafkaTopic = String.join(":", category, app, runtimeEnv);
Message<FabricPayload<T>> message = MessageBuilder.withPayload(payload)
.setHeader(fabricAuthHeader, String.join(":", event.getRealmId(), event.getAccountId(), event.getUserId()))
.setHeader(KafkaHeaders.MESSAGE_KEY, String.join(":", event.getRealmId(), event.getAccountId()))
.setHeader(KafkaHeaders.TOPIC, kafkaTopic).build();
log.debug("Publishing message {}", message);
return message;
};
}
this function is present in the com.foo.bar.fabric.nh44.functions.producer
package, similarly I have a consumer for Kafka messages in the com.foo.bar.fabric.nh44.functions.consumer
package. The idea is to bundle this code in to a JAR library (event-library
) and then use it across all our microservices for event driven communication.
I wrote a test to document using these components in microservices by writing tests under the com.foo.bar.example
package in the src/test/resources
directory of event-library
code.
The test passes only if add both @ComponentScan(basePackages = {"com.foo.bar"})
and spring.cloud.function.scan.packages
@SpringBootApplication
@ComponentScan(basePackages = {"com.foo.bar"})
class ExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}
}
And then the test
@Slf4j
@SpringBootTest
@EmbeddedKafka(
brokerProperties = {"listeners=PLAINTEXT://localhost:11092"},
topics = {"fabric-piglet-dev", "alm-nimble-dev"},
partitions = 1
)
@TestPropertySource(properties = {
"foo.bar.fabric.nh44.category=fabric",
"runtime-env=dev",
"spring.application.name=piglet",
"spring.cloud.function.scan.packages=com.foo.bar.fabric.nh44.functions.consumer,com.foo.bar.fabric.nh44.functions.producer",
"spring.cloud.function.definition=publish;consume",
"spring.cloud.stream.default.contentType=application/json",
"spring.cloud.stream.kafka.binder.autoCreateTopics=false",
"spring.cloud.stream.kafka.binder.brokers=localhost:11092",
"spring.cloud.stream.kafka.default.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.cloud.stream.kafka.default.producer.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.cloud.stream.kafka.default.producer.configuration.max.block.ms=100",
"spring.cloud.stream.kafka.default.consumer.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"spring.cloud.stream.kafka.default.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.cloud.stream.bindings.publish-out-0.destination=${foo.bar.fabric.nh44.category}-${spring.application.name}-${runtime-env}",
"spring.cloud.stream.bindings.consume-in-0.group=${foo.bar.fabric.nh44.category}-${spring.application.name}-consumer-${runtime-env}",
"spring.cloud.stream.bindings.consume-in-0.destination=alm-nimble-dev",
"foo.bar.fabric.auth-header=X-FABRIC-CORRELATION"
})
class ExampleApplicationTests {
// fields and autowiring
@Test
@DisplayName("SHOULD publish to kafka after entity is saved")
void testPublishesLifeCycleEventsToKafka() {
Map<String, Object> configs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer", "false", kafkaBroker));
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
Consumer<String, Object> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), jsonDeserializer).createConsumer();
consumer.subscribe(singleton("fabric-piglet-dev"));
ExampleEntity entity = // build entity
FabricEvent event = //build event
streamBridge.send("publish-in-0", "kafka", FabricPayload.builder().event(event).content(entity).build(), MimeType.valueOf("application/json"));
ConsumerRecord<String, Object> received = KafkaTestUtils.getSingleRecord(consumer, "fabric-piglet-dev", 2000);
Assertions.assertNotNull(received, "Verifying record received");
Assertions.assertEquals("r1:a1:u1", new String(received.headers().lastHeader(fabricAuthHeader).value(), StandardCharsets.UTF_8));
consumer.close();
}
Shouldn't spring.cloud.function.scan.packages
be enough to register the functions as beans?
In application.yml file declare functions.
spring:
application:
name: app-name
cloud:
function:
definition: test1Func;test2Func