I have a method which gets elements from different sources using the poll method if there is a request for a new element. How can I create source from this method so that it only asks for a new element from polling method only?
Integrating with polling APIs is explained in the akka.io blog post: Writing Akka Streams Connectors for existing APIs in the part "Polling based APIs".
At the core of it you'll want to extend a TimerGraphStageLogic
, and do things like:
private void schedulePoll() {
scheduleOnce("poll", pollInterval);
}
@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}
to schedule the polling.
Alternatively, you could stick to implementing all callbacks within a unfoldAsyncResource
.
Full implementation of such stage can be found here (implementation in Java).