I'm trying to convert a Flowable into LiveData and observing it in the activity. My Flowable is emitting values with a constant delay , but the LiveData which i'm converting this Flowable to, is not at all receiving any values in its observer. I have created a sample code to demonstrate the problem
Activity
public class MyrActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
MyViewModel myViewModel = ViewModelProviders.of(this).get(MyViewModel.class);
myViewModel.init();
myViewModel.getListLiveData().observe(this, new Observer<List<String>>() {
@Override
public void onChanged(@Nullable List<String> strings) {
Timber.d("value received in live data observer: %s", strings);
// This callback never get called
for (String string : strings) {
Timber.d(string);
}
}
});
}
}
ViewModel class
static class MyViewModel extends ViewModel{
LiveData<List<String>> mListLiveData;
PublishProcessor<String> mStringPublishProcessor = PublishProcessor.create();
public void init() {
mListLiveData = LiveDataReactiveStreams.fromPublisher(mStringPublishProcessor.toList().toFlowable());
// This is to trigger the mStringPublishProcessor on constant intervals
Observable.interval(0,5,TimeUnit.SECONDS)
.map(aLong -> {
Timber.d("value emitted: "); // this log is showing as expected
mStringPublishProcessor.onNext("Value "+aLong);
return aLong;
}).subscribe();
}
public LiveData<List<String>> getListLiveData() {
return mListLiveData;
}
}
Now , in my activity I can see only the logs from the Observable.interval
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
Why the LiveData observer never received any values from the Flowable?
as per the documentation of LiveDataReactiveStreams.fromPublisher
Creates an Observable stream from a ReactiveStreams publisher. When the LiveData becomes active, it subscribes to the emissions from the Publisher.
.toList()
will map the return only after onComplete()
call. In your example, the completion is never called.