Search code examples
rx-javareactive-programmingrx-kotlin

Applying an operation to all previously emitted items


I have a queue of items that will send once the server is reachable:
val queueHistory: Observable<QueuedItem>

A QueuedItem is:
data class QueuedItem(val item: Item, val sent: Boolean = false)

The queueHistory never completes, it just records when an item is being queued to be sent onNext(QueuedItem(item1, false), and then later records it was sent onNext(QueuedItem(item1, true).

What I want to do is get a current count of how many unsent items there are.

The main source of my trouble is due to the list not completing, I was initially thinking of using collect but that needs a completed list.

I was playing around with using scan with something like
queueHistory.scan { items: ScannedItems, item -> ScannedItems(arrayOf(*items, item), 0) }
Where I could keep the current list of items I have encountered so far, but scan wants everything to be the same type.

Another idea I had was

queueHistory
            .groupBy { it.item }
            .flatMapSingle { it.toList() }
            .map { it.size % 2 }

But toList() needs a finite list.

Any ideas would be appreciated!


Solution

  • There is a #scan overload, which takes a seed value and a lambda, which takes two parameters (prev, current). The prev parameter is of the same type as the seed, the current parameter is of the same type as the upstream.

    Example

    class So65587608 {
        @Test
        fun `65587608`() {
            val producer = PublishSubject.create<QueuedItem>()
    
            val map = producer.scan(mutableListOf<QueuedItem>(), { list, curr ->
                // when send = true -> remove
                if (curr.sent) {
                    list.removeIf { it.item == curr.item }
                } else if (!list.any { it.item == curr.item }) {
                    list.add(curr)
                }
                list.toMutableList()
            }).map { list -> list as List<QueuedItem> }
    
            map.subscribe {
                println(it)
            }
    
            val test = map.test()
    
            producer.onNext(QueuedItem("1"))
            producer.onNext(QueuedItem("1", true))
            producer.onNext(QueuedItem("2", true))
            producer.onNext(QueuedItem("3", true))
            producer.onNext(QueuedItem("4"))
            producer.onNext(QueuedItem("5"))
            producer.onNext(QueuedItem("4", true))
        }
    
        data class QueuedItem(val item: String, val sent: Boolean = false)
    }
    

    Output

    [] // seed value
    [QueuedItem(item=1, sent=false)]
    []
    []
    []
    [QueuedItem(item=4, sent=false)]
    [QueuedItem(item=4, sent=false), QueuedItem(item=5, sent=false)]
    [QueuedItem(item=5, sent=false)]
    

    Note

    You have to make a copy of the list each #scan iteration or use a immutable list from persistent data collections.

    Furthermore this is probably not a good implementation, because the list is unbound, which could take all your memory. If the list is big enough, the linear search might take some time as well, which is probably not good. One should think how the loopup could be done better.