Search code examples
apache-sparkapache-kafkaspark-structured-streamingmemorystreamspark-java

Configuring Apache Spark's MemoryStream to simulate Kafka stream


I was requested to look into using Apache Spark's MemoryStream to simulate a Kafka stream in a Java Spring Boot service. The documentation / online community is a bit small on this topic so I am seeking aid.

This is my implementation code.

final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);

final Dataset<Row> df = kafkaDataStreamReader.load();

return df.writeStream().foreachBatch((batch, batchId) -> {
    // Process a batch of data received from Kafka
    updateData(name, customizerFunction, avroSchema, batch);
  • KafkaLoader is a class which, depending on the Profile (it/prod), will configure the Kafka Stream differently. It returns a DataStreamReader which might be the reason why I'm struggling to create a MemoryStream.
  • Next, in the writeStream i'm writing to my source destinations.
@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
  @Autowired
  SparkSession sparkSession;

  @SneakyThrows
  @Override
  public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
    options = Map.of();
    MemoryStream<String> stream = null;
    try {
      stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
      String jsonString = "{..}";

      Seq<String> seq = JavaConverters
        .collectionAsScalaIterableConverter(List.of(jsonString))
        .asScala()
        .toSeq();

      Offset currentOffset = stream.addData(seq);
      stream.commit(currentOffset);
    } catch (Exception e){
      log.warn("Error creating MemoryStream: ", e);
      return new DataStreamReader(sparkSession);
    }
    Dataset<Row> data = stream.toDF();
    log.debug("Stream enabled [t/f]: {}", data.isStreaming());
    return data
      .sqlContext()
      .readStream();
      .format("kafka")
      .option("kafka.bootstrap.servers", "test-servers")
      .option("subscribe", "test-data");
  }

ItKafkaLoader is called when I'm running integration tests, hence ActiveProfiles is set to it here, and is where I'm struggling to create a MemoryStream. Because my implementation code is expecting a returned object of type DataStreamReader I believe I need to call on readStream() since it's of type DataStreamReader? However, when I just try readStream() Spark throws an exception about my path not being defined.

java.lang.IllegalArgumentException: 'path' is not specified
    at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError

When searching this error I tend to see that I need to set my format to Kafka. And then doing this, Spark expects a topic and then a broker. I was hoping that since I was using MemoryStream that Spark would just recognize that this is a dummy Kafka cluster & topic and go about kicking of my simulated Kafka Stream through my MemoryStream. That doesn't happen, and when I run my integration test I get these errors.

- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers

Ideally, I would like to figure out how to fix getStream() in ItKafkaLoader, however I have a slight feeling that i don't understand what MemoryStream is really for and might need to do something different.

Update: I have seen that in newer versions of Spark you can just set the format to memory, however, it appears that my Spark version v2.12 does not support that. I also do not have the green light to upgrade my Spark version.


Solution

  • Ok I figured out how to make MemoryStream work for my need of simulating a stream.

    • Configuring my MemoryStream as a DataStreamReader wasn't the move.
      • startStream required a format, and a format causes the need for a path
      • If i wanted to do so i should've used the memory format, however, that option isn't available in my version of spark 2.12
    • A work around was to just to add data to the MemoryStream then rely on the writeStream() in my implementation code to kick utilize my simulated stream.
      • I changed the response of my getStream() function to be of type Dataset instead of a DataStreamReader

    Here is my code changes

    @Profile("it")
    public class ItKafkaLoader extends KafkaLoader {
    
      @SneakyThrows
      @Override
      public Dataset<Row> getStream(SparkSession sparkSession, Map<String, String> options) {
        MemoryStream<Data> stream;
        try {
          Encoder<Data> encoder = Encoders.bean(Data.class);
    
          stream = new MemoryStream<>(1, sparkSession.sqlContext(),null, encoder);
    
          List<Data> data = getData();
    
          Dataset<Data> df = sparkSession.createDataset(data, encoder);
    
          Seq<Data> seqT = JavaConverters
            .asScalaIteratorConverter(df.toLocalIterator())
            .asScala()
            .toSeq();
    
          stream.addData(seqT);
    
        } catch (Exception e) {
          log.warn("Error creating MemoryStream: ", e);
          return sparkSession.emptyDataFrame();
        }
        Dataset<Row> data = stream.toDF();
        log.debug("Stream enabled [t/f]: {}", data.isStreaming());
        return data;
      }
    

    Some things to point out:

    • If I were to just to make a MemoryStream<String> then my schema would've just had a default column of value where a row is the whole json object instead of my required attributes inside of my custom Data object.
      • I need to define an encoder so that the data I add to my MemoryStream can carry the same schema as my custom object Data
    • The addData() function in Java Spark requires a Seq type
      • Simplest way I found to accomplish this is to convert a Collection to a Seq using the code below
      • I was able to convert a List and a Dataset/Dataframe to a Sequence using this
    Seq<Data> seq = JavaConverters
        .asScalaIteratorConverter(<list / collection here ...>)
        .asScala()
        .toSeq();
    

    Here is how my updated implementation code looks

    Dataset<Row> df = kafkaLoader.getStream(sparkSession, options);
        
    StreamingQuery streamingDf = df.writeStream().foreachBatch((batch, batchId) -> {
            // Process a batch of data received from Kafka
            updateData(name, customizerFunction, avroSchema, batch);
            ...  
        })
        .option("checkpointLocation", "checkpointname-" + name)
        .start();