Search code examples
scalascala-collectionstail-recursion

Tail Recursion Use Case Solution in Scala


I am trying to solve a problem using tail recursion. The use case is:

I have list of folders and each folder has list of files and each file has several records. I want to perform some transformation the records and write them to kinesis in batches.

val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))

I want to write let's say two records at a time in kinesis. So far I have tried:

listOfFolders.map { folder =>
    val files = fetchAllFilesFromFolder(folder)
    if (files.nonEmpty) {
      sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
    } else {
      logger.info(s"No files are present in folder")
    }
  }

  @scala.annotation.tailrec
  def sendBatch(
    files: Seq[Files],
    buffer: Seq[(ByteBuffer, String)],
    numberOfRecordsToSend: Int
  ): Unit =
    files match {
      case Nil => {
        if (buffer.nonEmpty) {
          sendToKinesis(streamName, buffer) map { putDataResult =>
            val putDataList = putDataResult.getRecords.asScala.toList
            logger.info(
              s"Successfully Sent"
            )
          }
        } else {
          logger.info(s"Successfully sent")
        }
      }
      case head :: tail => {
        val fileData = readFileData()
        val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)

        val currentBatch = buffer ++ byteData
        if (currentBatch.size >= numberOfRecordsToSend) {
          sendToKinesis(streamName, buffer)  map { putRecordRes =>
            val putDataList = putRecordRes.getRecords.asScala.toList
            logger.info(
              s"Sent successfully" 
            )
          }
          sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
        } else {
          sendBatch(tail, currentBatch, 2)
        }
      }
    }

sendToKinesis uses KCL putRecords.

Problems with the above code is:

  • Reads all the data from one file. So if file has 5 records will send 5 records to kinesis but the batch size is 2.

    Can't call the tail recursive method from map.

    Also to be taken care if - If file1 has 3 records it should send 2 records RF11, RF12 together and then RF13,RF21 together and at last RF22.

I do not want to use any var in my code. Can it be solved using the tail rec?


Solution

  • You have two subproblems there

    1. How to send batches of fixed size
    @scala.annotation.tailrec
    def sendBatch(file: Option[File], buffer: Seq[(ByteBuffer, String)], numbersOfRecrodsToSend: Int): Seq[(ByteBuffer, String)] = {
      if (buffer.length < numbersOfRecrodsToSend) {
        // case 1: too few records to be sent 
        file match {
          // case 1.1: file was not yet read
          case Some(f) => sendBatch(None, buffer ++ getByteData(f), numbersOfRecrodsToSend)
          // case 1.2: too few records, file was already read, return leftover records
          case None => buffer
        }
      } else {
        // case 2: we can send numbersOfRecrodsToSend to Kinesis
        val (toSend, newBuffer) = buffer.splitAt(numbersOfRecrodsToSend)
        sendToKinesis(streamName, toSend)
        sendBatch(file, newBuffer, numbersOfRecrodsToSend)
      }
    }
    
    1. How to iterate over the list and send batches of fixed size
    // start with empty list of files to send and for each folder
    // add it's files to the buffer and send as many records as you can
    // the leftover is going to be passed to next iteration for both files and directories
    val partial = listOfFolders.foldLeft(Seq.empty[(ByteBuffer, String)]) { case (acc, folder) =>
      fetchAllFilesFromFolder(folder).foldLeft(acc) { case (acc2, file) => 
        sendBatch(Some(file), acc2, numbersOfRecrodsToSend)
      }
    }
    
    // if any records have left - send them too
    if (partial.nonEmpty) {
      sendToKinesis(streamName, partial)
    }
    

    Hopefully you got the idea.