I have 2 routes which intends to filter user's messages coming from kafka based on a blacklist of user's ids stored in caffeine cache.
The first route that loads a txt file containing ids of blacklisted users at startup, is defined as follow :
from(file(...))
.id(getRouteId())
.split().tokenize("\n")
.stopOnException()
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionTimeout(500)
.setHeader(CaffeineConstants.ACTION, constant(CaffeineConstants.ACTION_PUT))
.setHeader(CaffeineConstants.KEY, constant("blacklistedIds"))
.toF("caffeine-cache://%s", "blacklistedIds")
Note: the blacklist consist of a List<String> blacklistedIds
and it's stored in caffeine cache (which get initialised at startup of the application).
The second route which get user's messages from kafka (User properties : id,firstname,lastname
), and it's defined as follow:
from(kafka(...))
.id(getRouteId())
.unmarshal().json(JsonLibrary.Jackson, UserMessage.class)
// filter here (based on cache) to only let go authorized users
//.filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
.to(output())
My question is how can I do to filter the incoming message by using the blacklistIds stored in caffeine cache ? How can I get the cache in UserMessageFilterService#isAuthorizedUser
bean's method ? Is there a better/simple way to achieve this ?
You can pass the message body and arbitrary headers to isAuthorizedUser()
from(kafka(...))
...
// save the UserMessage in a header
.setHeader("userMessage", body())
// retrieve the cached list of blacklisted Ids
.setHeader(CaffeineConstants.CamelCaffeineAction, CaffeineConstants.GET)
.setHeader(CaffeineConstants.CamelCaffeineKey, constant("blacklistedIds"))
.to("caffeine-cache:blacklistedIds)
.filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
// restore the original body
.setBody(header("userMessage"))
UserMessageFilterService.java
public boolean isAuthorizedUser(List<String> blacklistedIds, @Header("userMessage") UserMessage userMessage) {
return !blacklistedIds.contains(userMessage.getUserId());
}