I have a simple setup to a problem but the solution seems to be more complicated.
Setup: I have a hot observable which originates from a scanner that will emit every number as a different element and an R
when a code is complete.
Problem: From this I want a hot observable that emits every full code as 1 element.
I tried playing around with different flatMap
, takeUntil
and groupBy
operators but haven't been able to come to a solution.
You can use the buffer operator.
PublishSubject<Token<Integer>> s = PublishSubject.create();
Observable<Token<Integer>> markers = s.filter(x->x.isMarker());
s.buffer(markers).subscribe(
v->{
Optional<Integer> reduce = v.stream()
.filter(t->!t.isMarker())
.map(t->(ValueToken<Integer>)t)
.map(ValueToken::get)
.reduce((a,b)->a+b);
reduce.ifPresent(System.out::println);
}
);
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17
s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet
I made a class to wrap both values and markers in the flow.
public abstract class Token<T> {
private static final MarkerToken MARKER = new MarkerToken<>();
public boolean isMarker() {
return false;
}
public static <T> MarkerToken<T> marker() {
return MARKER;
}
public static <T> ValueToken<T> value(T o) {
return new ValueToken<>(o);
}
public static class ValueToken<T> extends Token<T> {
T value;
public ValueToken(T value) {
this.value = value;
}
public T get() {
return value;
}
}
public static class MarkerToken<T> extends Token<T> {
public boolean isMarker() {
return true;
}
}
}
The previous method would emit also on the closing of the stream, with this solution you can emit only complete buffers.
The message class function as an accumulator, it will accumulate tokens until closing marker is accumulated.
When this happens the next message will start from scratch.
The presence of the closing mark as last element marks the message as complete.
public static class Message<T> {
List<Token<T>> tokens = new ArrayList<>();
public Message<T> append(Token<T> t) {
Message<T> mx = new Message<T>();
if(!isComplete()) {
mx.tokens.addAll(tokens);
}
mx.tokens.add(t);
return mx;
}
public boolean isComplete() {
int n = tokens.size();
return n>0 && tokens.get(n-1).isMarker();
}
public Optional<List<Token<T>>> fullMessage(){
return isComplete() ? Optional.of(tokens):Optional.empty();
}
}
Scanning the source you emit a message for each token emitted, then you filter out incomplete message and emit just the one marked as complete.
s.scan(new Message<Integer>(), (a, b) -> a.append(b))
.filter(Message::isComplete)
.map(Message::fullMessage)
.map(Optional::get).subscribe(v -> {
System.out.println(v);
});
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker());// [V(12), V(13), MARKER]
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // [V(10), V(7), MARKER]
s.onNext(value(10));
s.onNext(value(127));
s.onComplete(); // Not emitting incomplete messages on the closing of the subject.