Search code examples
javaapache-sparkamazon-s3orcpresto

Spark No output operations registered, so nothing to execute but I'm writing to a file


val sc = new SparkContext(conf)

val streamContext = new StreamingContext(sc, Seconds(1))

val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
  .at(Regions.US_EAST_1)
  .withTimeout(5))


val jsonRows = sqs.mapPartitions(partitions => {
  val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))

  val txfm = new LogLine2Json
  val log = Logger.getLogger("parseLog")
  val sqlSession = SparkSession
    .builder()
    .getOrCreate()

  val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
  val parsedDate = parsedFormat.format(new java.util.Date())
  val outputPath = "/tmp/spark/presto"

  partitions.map(messages => {
    val sqsMsg = Json.parse(messages)
    System.out.println(sqsMsg)

    val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
    val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
    System.out.println(bucketName)
    System.out.println(key)
    val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
    val stream = obj.getObjectContent()
    scala.io.Source.fromInputStream(stream).getLines().map(line => {
        try{
          val str = txfm.parseLine(line)
          val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
          jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
        }
        catch {
          case e: Throwable => {log.info(line); "";}
        }
      }).filter(line => line != "{}")
    })
})

streamContext.start()
streamContext.awaitTermination()

My job is really simple we take a S3 key from SQS. The content of the file is nginx log and we parse that using our parser which is working file. LogLine2Json It's converting the log to JSON format then we will write that to orc format.

But I'm getting this error

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at SparrowOrc$.main(sparrowOrc.scala:159)
    at SparrowOrc.main(sparrowOrc.scala)

I understand that Spark needs an action otherwise it won't work. But I have this code to write to an orc file. I'm not sure if I have to do anything else?

jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)

Solution

  • First of all map is not an action. It is a transformation. Spark has no reason to execute this code.

    Next, you should avoid side effects in transformations, and you should never use these, if correctness of the output is required.

    Finally using standard io functions in distributed systems is typically meaningless.

    Overall you should review existing options for DStream sinks, and if none these is suitable in your scenario, write your own using an action (foreach, foreachPartition).