Search code examples
scalaakkaakka-stream

Create Source from a polling method in Akka


I have a method which gets elements from different sources using the poll method if there is a request for a new element. How can I create source from this method so that it only asks for a new element from polling method only?


Solution

  • Integrating with polling APIs is explained in the akka.io blog post: Writing Akka Streams Connectors for existing APIs in the part "Polling based APIs".

    At the core of it you'll want to extend a TimerGraphStageLogic, and do things like:

    private void schedulePoll() {
      scheduleOnce("poll", pollInterval);
    }
    
    @Override
    public void onTimer(Object timerKey) {
      if (!isClosed(out)) {
        doPoll();
        if (!buffer.isEmpty()) {
          pushHead();
        } else {
          schedulePoll();
        }
      }
    }
    

    to schedule the polling.

    Alternatively, you could stick to implementing all callbacks within a unfoldAsyncResource.

    Full implementation of such stage can be found here (implementation in Java).