Search code examples
androidkotlinrx-java2rx-androidrx-kotlin2

How to make the map wait till the current index item is finished processing and then take the next item for processing using RxJava?


I'm trying to convert the input streams into the file. So when user selects 1 image, it all works well. But when user selects multiple images, for example 4 then the below code is not working as expected; I see only 2 files path in Log statement.


compositeDisposable.add(Observable.fromIterable(inputStreamList)
                .map {
                    FileUtils.saveInputStreamToFile(it, directory, 500)
                }.toList()
                .toObservable()
                .subscribeOn(schedulerProvider.io())
                .subscribe({
                    it.forEach {file->
                        Log.d("TAG", "Path ${file.path}")
                    }
                    Log.d("TAG", "Size ${it.size}")
                }, {

                })
        )

Here's saveInputStreamToFile method


fun saveInputStreamToFile(input: InputStream, directory: File, height: Int): File? {
        val currentTime = dateFormat.format(Date())
        val imageName = "_$currentTime"
        val temp = File(directory.path + File.separator + "flab\$file\$for\$processing")
        try {
            val final = File(directory.path + File.separator + imageName + ".$IMAGE_JPG")
            val output = FileOutputStream(temp)
            try {
                val buffer = ByteArray(4 * 1024) // or other buffer size
                var read: Int = input.read(buffer)
                while (read != -1) {
                    output.write(buffer, 0, read)
                    read = input.read(buffer)
                }
                output.flush()
                saveBitmap(decodeFile(temp, height)!!, final.path, IMAGE_JPG, 80)
                return final
            } finally {
                output.close()
                temp.delete()
            }
        } finally {
            input.close()
        }
    }


I want the next input stream to be taken only once the current input stream is converted to file. How to achieve this? Please help


Solution

  • In RX you can do something like this:

        Observable.fromCallable {
            return inputStreamList.map {
                FileUtils.saveInputStreamToFile(it, directory, 500)
            }.toList()
    
        }.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(object : Observer<List<File>> {
                override fun onComplete() {
    
                }
    
                override fun onSubscribe(d: Disposable) {
                }
    
                override fun onNext(files: List<File>) {
                    //here you can iterate and complete do your work
                    files.forEach { file ->
                        Log.d("TAG", "Path ${file.path}")
                    }
                    Log.d("TAG", "Size ${files.size}")
    
                }
    
                override fun onError(e: Throwable) {
                }
    
            })
    

    This will ensure that your work will be done synchronously in background, and you will observe your result in onNext() on the MainThread as specified.

    Make sure you use package io.reactivex

    EDIT:

    get rid of your RX package and set these in build.gradle

    implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'