Search code examples
springspring-bootspring-webfluxproject-reactorspring-data-cassandra

Filtering with comparing each element of a Flux to a single Mono


I am trying to use a Mono of a username to filter out every element of a Flux (the flux having multiple courses) and I am using Cassandra as backend , here is the schema:

CREATE TABLE main.courses_by_user (
    course_creator text PRIMARY KEY,
    courseid timeuuid,
    description text,
    enrollmentkey text
) WITH additional_write_policy = '99PERCENTILE'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99PERCENTILE';
 course_creator | courseid                             | description | enrollmentkey
----------------+--------------------------------------+-------------+---------------
        rascall | b7757e80-0c24-11ed-aec5-23fe9d87e512 |   Cosmology |        hubble
       dibiasky | b7757e81-0c24-11ed-aec5-23fe9d87e512 |   astronomy |    thebigbang
  michaeljburry | b7753060-0c24-11ed-aec5-23fe9d87e512 |         Lol |         enter
   Noam Chomsky | 6c1a4800-09ac-11ed-ada9-83d934863d60 |          Hi |           Bye

I am using zipWith to pair the Flux of courses with the Mono of user, here is the code

public Flux<CourseByCreator> getMyCourses(Principal placeholder){
        Mono<User> principal = Mono.just(new User("dibiasky", "whatifitsnotequal","Kate" ,"Dibiasky"));
        return allCreatedCourses = this.courseByCreatorRepository.findAll()
                .zipWith(principal)
                 .flatMap(tuple -> {
                     if(tuple.getT1().getCourseCreator().equals(tuple.getT2().getUsername())){
                         System.out.println(tuple.getT2().getUsername());
                         return Flux.just(tuple.getT1());
                     }else{
                         return Flux.empty();
                     }
                 }).log();
}

For some reason I am not getting an empty return result despite the user have one matching username and the courses have one matching course with the same creator What am I doing wrong here?


Solution

  • Flux.zipWith:

    Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes.

    The mono will emit one then complete.

    I'd rather first resolve the principal, then filter the courses:

    return Mono
      .just(new User("dibiasky", "whatifitsnotequal","Kate" ,"Dibiasky"))
      .flatMapMany(principal -> {
        return courseByCreatorRepository
          .findAll()
          .filter(course -> couse.getCourseCreator().equals(principal.getUsername()));
      })
      .log();