Search code examples
scalaakka-stream

How to work with Source.Queue in Akka-Stream


I am toying around trying to use a source.queue from an Actor. I am stuck in parttern match the result of an offer operation

class MarcReaderActor(file: File, sourceQueue: SourceQueueWithComplete[Record])  extends Actor {

  val inStream = file.newInputStream
  val reader   = new MarcStreamReader(inStream)

  override def receive: Receive = {

    case Process => {
      if (reader.hasNext()) {
        val record = reader.next()
        pipe(sourceQueue.offer(record)) to self
      }
    }

    case f:Future[QueueOfferResult] => 
    }
  }
}

I don't know how to check if it was Enqueued or Dropped or Failure

if i write f:Future[QueueOfferResult.Enqueued] the compile complain


Solution

  • Since you use pipeTo, you do no need to match on futures - the contents of the future will be sent to the actor when this future is completed, not the future itself. Do this:

    override def receive: Receive = {
      case Process => 
        if (reader.hasNext()) {
          val record = reader.next()
          pipe(sourceQueue.offer(record)) to self
        }
    
      case r: QueueOfferResult => 
        r match {
          case QueueOfferResult.Enqueued =>     // element has been consumed
          case QueueOfferResult.Dropped =>      // element has been ignored because of backpressure
          case QueueOfferResult.QueueClosed =>  // the queue upstream has terminated
          case QueueOfferResult.Failure(e) =>   // the queue upstream has failed with an exception
        }
    
      case Status.Failure(e) =>  // future has failed, e.g. because of invalid usage of `offer()`
    }