Search code examples

Rx programming, how to combine item with former element in single observable?

If we have the observable:

1 -> 2 -> 3 -> 4 -> 5 -> ...

how to construct to the new observable:

(1, 2) -> (3, 4) -> ...

Maybe the question to short but I really not find how to achieve.Thank you

Thanks every one ,I find a method and under consider remove var

import java.util.concurrent.TimeUnit

import rx.lang.scala.{Subject, Observable}

import scala.concurrent.duration.Duration
object ObservableEx {
   implicit class ObservableImpl[T](src: Observable[T]) {
       * 1 -> 2 -> 3 -> 4 ->...
       * (1,2) -> (3,4) -> ...
     def pair: Observable[(T, T)] = {
       val sub = Subject[(T, T)]()
       var former: Option[T] = None //help me to kill it
         x => {
           if (former.isEmpty) {
             former = Some(x)
           else {
             sub.onNext(former.get, x)
             former = None
         e => sub.onError(e),
         () => sub.onCompleted()


object Test extends App {
  import ObservableEx._
  val pair = Observable.interval(Duration(1L, TimeUnit.SECONDS)).pair
  pair.subscribe(x => println("1 - " + x))
  pair.subscribe(x => println("2 - " + x))


I didn't like var at all, thanks again!

FINALLY I get a light way, hope can help others.

 def pairPure[T](src: Observable[T]): Observable[(T, T)] = {
   def pendingPair(former: Option[T], sub: Subject[(T, T)]): Unit = {
     val p = Promise[Unit]
     val subscription = src.subscribe(
       x => {
         if (former.isEmpty) {
           pendingPair(Some(x), sub)
         else {
           sub.onNext(former.get, x)
           pendingPair(None, sub)
       e => sub.onError(e),
       () => sub.onCompleted()
     ){x => subscription.unsubscribe()}

   val sub = Subject[(T,T)]()
   pendingPair(None, sub)

Other answers also very helpful~


  • You can use tumblingBuffer with count = 2 to get an Observable of Seqs of length 2, and using map, you can turn them into pairs:

    implicit class ObservableImpl[T](src: Observable[T]) {
      def pair: Observable[(T, T)] = {
        def seqToPair(seq: Seq[T]): (T, T) = seq match {
          case Seq(first, second) => (first, second)

    Note that this will fail if the number of elements in the source Observable is odd, so you will have to cover this case in seqToPair.