Search code examples
scalaakkaakka-streamakka-http

Custom version of fileUpload directive fails to materialize


When user uploads file to my web-service I'd like to collect non-binary fields from POST request. They contain metadata of uploaded file. So I modified akka-http's fileUpload directive to this

def fileUpload3(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
  entity(as[Multipart.FormData]).flatMap { formData ⇒
    extractRequestContext.flatMap { ctx ⇒
      implicit val mat: Materializer = ctx.materializer

      val fut =
        formData.parts.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
          if (part.filename.nonEmpty && part.name == fieldName) {
            fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
          } else if (part.filename.isEmpty && part.entity.contentType.mediaType == MediaTypes.`text/plain` && part.entity.isInstanceOf[HttpEntity.Strict]) {
            fields.updated(part.name, part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String) → pairOpt
          } else {
            fields → pairOpt
          }
        }
          .collect {
            case (fields, Some((info, stream))) ⇒
              (fields, info, stream)
          }
          .runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])

      onSuccess(fut)
    }
  }.flatMap {
    case Some(tuple) ⇒ provide(tuple)
    case None ⇒ reject(MissingFormFieldRejection(fieldName))
  }

Although I don't see much difference from original code it fails for me when I use it with following exception:

akka.stream.AbruptIOTerminationException: Stream terminated without completing IO operation.
Caused by: akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds
    at akka.stream.impl.fusing.SubSource.timeout(StreamOfStreams.scala:746)

What am I missing, guys ?


Solution

  • I didn't realize it at first but because we're getting all fields from single continuous stream we can't extract one of the fields for later streamable use via Source[T] even though akka-streams allow us doing so.

    Thus every part of multipart request must be drained before processing the next one.

    Also note that following function would collect only text fields coming before binary file.

    def fileUploadWithFields(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
      entity(as[Multipart.FormData]).flatMap { formData ⇒
        extractRequestContext.flatMap { ctx ⇒
          implicit val mat: Materializer = ctx.materializer
    
          // Because it's continuous stream of fields we MUST consume each field before switching to next one. [https://stackoverflow.com/q/52765993/226895]
          val fut = formData.parts
            .takeWhile(part ⇒ !(part.filename.isDefined && part.name == fieldName), inclusive = true)
            .fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
              if (part.filename.nonEmpty && part.name == fieldName) {
                //println(s"Got file field: $part")
                fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
              } else if (part.filename.isEmpty && part.entity.contentType.mediaType.isText && part.entity.isInstanceOf[HttpEntity.Strict]) {
                //println(s"Got text field: $part")
                val text = part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String
                fields.updated(part.name, text) → pairOpt
              } else {
                //println(s"IGNORING field: $part")
                part.entity.discardBytes()
                fields → pairOpt
              }
            }
            .collect {
              case (fields, Some((info, stream))) ⇒
                //println(s"Completed scanning fields: ${(fields, info, stream)}")
                (fields, info, stream)
            }
            .runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
    
          onSuccess(fut)
        }
      }.flatMap {
        case Some(tuple) ⇒ provide(tuple)
        case None ⇒ reject(MissingFormFieldRejection(fieldName))
      }