Search code examples
javaakkaakka-stream

How to implement fault tolerant file upload with akka remote and steam


I'm an Akka beginner. (I am using Java)

I'm making a file transfer system using Akka.

Currently, I have completed sending the Actor1(Local) -> Actor2(Remote) file.

Now,

When I have a problem transferring files, I'm thinking about how to solve it. Then I had a question. The questions are as follows.

If I lost my network connection while I was transferring files, the file transfer failed (90 percent complete). I will recover my network connection a few minutes later.

Is it possible to transfer the rest of the file data? (10% Remaining)

If that's possible, Please give me some advice.

here is my simple code. thanks :)

Actor1 (Local)

private Behavior<Event> onTick() {
    ....
    String fileName = "test.zip";
    Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
    logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
    SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
    getContext().ask(
            Receiver.FileTransfered.class,
            selectedReceiver,
            timeout,
            responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
            (response, failure) -> {
                if (response != null) {
                    return new TransferCompleted(fileName, response.transferedSize);
                } else {
                    return new JobFailed("Processing timed out", fileName);
                }
            }
    );
}

Actor2 (Remote)

public static Behavior<Command> create() {
    return Behaviors.setup(context -> {
        ...
        Materializer mat = Materializer.createMaterializer(context);
        return Behaviors.receive(Command.class)
                .onMessage(TransferFile.class, command -> {
                    command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
                    command.replyTo.tell(new FileTransfered("filename", 1024));
                    return Behaviors.same();
                }).build();
    });
}

Solution

  • You need to think about following for a proper implementation of file transfer with fault tolerance:

    1. How to identify that a transfer has to be resumed for a given file.
    2. How to find the point from which to resume the transfer.

    Following implementation makes very simple assumptions about 1 and 2.

    1. The file name is unique and thus can be used for such identification. Strictly speaking, this is not true, for example you can transfer files with the same name from different folders. Or from different nodes, etc. You will have to readjust this based on your use case.
    2. It is assumed that the last/all writes on the receiver side wrote all bytes correctly and total number of written bytes indicate the point to resume the transfer. If this cannot be guaranteed, you need to logically split the original file into chunks and transfer hashes of each chunk, its size and position to the receiver, which has to validate chunks on its side and find correct pointer for resuming the transfer.
    3. (That's a bit more than 2 :) ) This implementation ignores identification of transfer problem and focuses on 1 and 2 instead.

    The code:

    object Sender {
      sealed trait Command
      case class Upload(file: String) extends Command
      case class StartWithIndex(file: String, index: Long) extends Sender.Command
    
      def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
        implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
        Behaviors.receiveMessage {
          case Upload(file) =>
            receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
            ctx.log.info(s"Initiating upload of $file")
            Behaviors.same
          case StartWithIndex(file, starWith) =>
            val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
            val ref = source.runWith(StreamRefs.sourceRef())
            ctx.log.info(s"Starting upload of $file")
            receiver.tell(Receiver.Upload(file, ref))
            Behaviors.same
        }
      }
    }
    
    object Receiver {
      sealed trait Command
    
      case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command
    
      case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command
    
      val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
        implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
        Behaviors.receiveMessage {
          case InitUpload(path, replyTo) =>
            val file = fileAtDestination(path)
            val index = if (file.exists()) file.length else 0
            ctx.log.info(s"Got init command for $file at pointer $index")
            replyTo.tell(Sender.StartWithIndex(path, index.toLong))
            Behaviors.same
          case Upload(path, fileSource) =>
            val file = fileAtDestination(path)
            val sink = if (file.exists()) {
              FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
            } else {
              FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
            }
            ctx.log.info(s"Saving file into ${file.toPath}")
            fileSource.runWith(sink)
            Behaviors.same
        }
      }
    }
    
    

    Some auxiliary methods

    val destination: File = Files.createTempDirectory("destination").toFile
    
    def fileAtDestination(file: String) = {
      val name = new File(file).getName
      new File(destination, name)
    }
    
    def writeRandomToFile(file: File, size: Int): Unit = {
      val out = new FileOutputStream(file, true)
      (0 until size).foreach { _ =>
        out.write(Random.nextPrintableChar())
      }
      out.close()
    }
    
    

    And finally some test code

    // sender and receiver bootstrapping is omitted
    
    //Create some dummy file to upload
    val file: Path = Files.createTempFile("test", "test")
    writeRandomToFile(file.toFile, 1000)
    
    //Initiate a new upload
    sender.tell(Sender.Upload(file.toAbsolutePath.toString))
    // Sleep to allow file upload to finish
    Thread.sleep(1000)
    
    //Write more data to the file to emulate a failure
    writeRandomToFile(file.toFile, 1000)
    //Initiate a new upload that will "recover" from the previous upload 
    sender.tell(Sender.Upload(file.toAbsolutePath.toString))
    
    

    Finally, the whole process can be defined as

    enter image description here