Search code examples
javaapache-sparkredisspring-data-redisspark-redis

Read data saved by spark redis using Java


I using spark-redis to save Dataset to Redis. Then I read this data by using Spring data redis:

This object I save to redis:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
    private static final long serialVersionUID = 8755574422193819444L;

    @Id
    private String id;

    @Indexed
    private int user;

    @Indexed
    private String product;

    private double productN;
    private double rating;
    private float prediction;

    public static RatingResult convert(Row row) {
        int user = row.getAs("user");
        String product = row.getAs("product");
        double productN = row.getAs("productN");
        double rating = row.getAs("rating");
        float prediction = row.getAs("prediction");
        String id = user + product;

        return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
                .prediction(prediction).build();
    }

}

Save object by using spark-redis:

JavaRDD<RatingResult> result = ...
...
sparkSession.createDataFrame(result, RatingResult.class).write().format("org.apache.spark.sql.redis")
            .option("table", "collaborative_filtering").mode(SaveMode.Overwrite).save();

Repository:

@Repository
public interface RatingResultRepository extends JpaRepository<RatingResult, String> {

}

I can't read this data have been saved in Redis by using Spring data redis because structure data saved by spark-redis and spring data redis not same (I checked value of keys created by spark-redis and spring data redis are different by using command: redis-cli -p 6379 keys \* and redis-cli hgetall $key)

So how to read this data have been saved using Java or by any library in Java?


Solution

  • The following works for me.

    Writing data from spark-redis.

    I use Scala here, but it's essentially the same as you do in Java. The only thing I changed is I added a .option("key.column", "id") to specify the hash id.

        val ratingResult = new RatingResult("1", 1, "product1", 2.0, 3.0, 4)
    
        val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
        spark
          .createDataFrame(result, classOf[RatingResult])
          .write
          .format("org.apache.spark.sql.redis")
          .option("key.column", "id")
          .option("table", "collaborative_filtering")
          .mode(SaveMode.Overwrite)
          .save()
    

    In spring-data-redis I have the following:

    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    @RedisHash("collaborative_filtering")
    public class RatingResult implements Serializable {
        private static final long serialVersionUID = 8755574422193819444L;
    
        @Id
        private String id;
    
        @Indexed
        private int user;
    
        @Indexed
        private String product;
    
        private double productN;
        private double rating;
        private float prediction;
    
        @Override
        public String toString() {
            return "RatingResult{" +
                    "id='" + id + '\'' +
                    ", user=" + user +
                    ", product='" + product + '\'' +
                    ", productN=" + productN +
                    ", rating=" + rating +
                    ", prediction=" + prediction +
                    '}';
        }
    }
    

    I use CrudRepository instead of JPA:

    @Repository
    public interface RatingResultRepository extends CrudRepository<RatingResult, String> {
    
    }
    

    Querying:

         RatingResult found = ratingResultRepository.findById("1").get();
         System.out.println("found = " + found);
    

    The output:

    found = RatingResult{id='null', user=1, product='product1', productN=2.0, rating=3.0, prediction=4.0}
    

    You may notice that the id field was not populated because the spark-redis stored has a hash id and not as a hash attribute.