Search code examples
spring-bootapache-camelcaffeine-cache

Apache camel filter records based on a list stored in caffeine-cache


I have 2 routes which intends to filter user's messages coming from kafka based on a blacklist of user's ids stored in caffeine cache.

The first route that loads a txt file containing ids of blacklisted users at startup, is defined as follow :

from(file(...))
        .id(getRouteId())
        .split().tokenize("\n")
            .stopOnException()
            .aggregate(constant(true), new GroupedBodyAggregationStrategy())
            .completionTimeout(500)
        .setHeader(CaffeineConstants.ACTION, constant(CaffeineConstants.ACTION_PUT))
        .setHeader(CaffeineConstants.KEY, constant("blacklistedIds"))
        .toF("caffeine-cache://%s", "blacklistedIds")

Note: the blacklist consist of a List<String> blacklistedIds and it's stored in caffeine cache (which get initialised at startup of the application).

The second route which get user's messages from kafka (User properties : id,firstname,lastname), and it's defined as follow:

from(kafka(...))
        .id(getRouteId())
        .unmarshal().json(JsonLibrary.Jackson, UserMessage.class)
        // filter here (based on cache) to only let go authorized users
        //.filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
        .to(output())

My question is how can I do to filter the incoming message by using the blacklistIds stored in caffeine cache ? How can I get the cache in UserMessageFilterService#isAuthorizedUser bean's method ? Is there a better/simple way to achieve this ?


Solution

  • You can pass the message body and arbitrary headers to isAuthorizedUser()

    from(kafka(...))
        ...
        // save the UserMessage in a header
        .setHeader("userMessage", body())
    
        // retrieve the cached list of blacklisted Ids
        .setHeader(CaffeineConstants.CamelCaffeineAction, CaffeineConstants.GET)
        .setHeader(CaffeineConstants.CamelCaffeineKey, constant("blacklistedIds"))
        .to("caffeine-cache:blacklistedIds)
    
        .filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
        
        // restore the original body
        .setBody(header("userMessage"))
    

    UserMessageFilterService.java

    public boolean isAuthorizedUser(List<String> blacklistedIds, @Header("userMessage") UserMessage userMessage) {
        return !blacklistedIds.contains(userMessage.getUserId());
    }