Search code examples
androidrx-javarx-androidevent-bus

RxJava as an event bus, onNext is called multiple times when only one event post


I'm implementing an event bus (RxBus) with RxJava.

RxBus.java

public class RxBus {

    private static final String TAG = LogUtils.makeTag(RxBus.class);
    private static final RxBus INSTANCE = new RxBus();

    private final Subject<Object, Object> mBusSubject = new SerializedSubject<>(PublishSubject.create());

    public static RxBus getInstance() {
        return INSTANCE;
    }

    public <T> Subscription register(final Class<T> eventClass, Action1<T> onNext) {
        return mBusSubject
                .filter(new Func1<Object, Boolean>() {
                    @Override
                    public Boolean call(Object event) {
                        return event.getClass().equals(eventClass);
                    }
                })
//                .filter(event -> event.getClass().equals(eventClass))
                .map(new Func1<Object, T>() {
                    @Override
                    public T call(Object obj) {
                        return (T) obj;
                    }
                })
//                .map(obj -> (T) obj)
                .subscribe(onNext);
    }

    public void post(Object event) {
        Log.d(TAG, "Apr12, " + "post event: " + event);
        mBusSubject.onNext(event);
    }
}

Post an event from viewHolder of a RecyclerView

public ViewHolder(LayoutInflater inflater, final ViewGroup parent) {
        super(inflater.inflate(R.layout.bill_card, parent, false));

        drawee = (SimpleDraweeView) itemView.findViewById(R.id.card_image);
        title = (TextView) itemView.findViewById(R.id.card_title);

        itemView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {

                Log.d(TAG, "Apr12, item clicked.");
                RxBus.getInstance().post(new ItemSelectedEvent(position));
            }
        });

        TagImageButton = (ImageButton) itemView.findViewById(R.id.tag_button);
        TagImageButton.setOnClickListener(new View.OnClickListener(){
            @Override
            public void onClick(View v) {
                Log.d(TAG, "Tag button clicked.");
                RxBus.getInstance().post(new ApplyTagForItemEvent(position));
            }
        });
    }
}

Subscribe the events from Fragment

@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
    super.onActivityCreated(savedInstanceState);

    mActivity = getActivity();
    Log.d(TAG, "getActivity(): " + getActivity());
    mItemClickSubscription = RxBus.getInstance().register(ItemSelectedEvent.class,
            new Action1<ItemSelectedEvent>() {
                @Override
                public void call(ItemSelectedEvent event) {
                    Log.d(TAG, "Apr12, " + "call event: " + event);
                    if (mDetail == null) {
                        if (getParentFragment() instanceof IFragmentStackHolder) {
                            IFragmentStackHolder fsh = (IFragmentStackHolder) getParentFragment();

                            Fragment details = new DetailCardFragment();
                            Bundle args = new Bundle();
                            args.putInt(ContentHolder.INDEX, event.getPosition());
                            details.setArguments(args);

                            fsh.pushFragment(details, event.getPairs());
                        }
                    }
                }
            });

    mApplyTagSubscription = RxBus.getInstance().register(ApplyTagForItemEvent.class,
            new Action1<ApplyTagForItemEvent>() {
                @Override
                public void call(ApplyTagForItemEvent event) {
                    IFragmentStackHolder fsh = (IFragmentStackHolder) getParentFragment();

                    Fragment tagApplyFragment = new TagApplyFragment();
                    Bundle args = new Bundle();
                    args.putInt(ContentHolder.INDEX, event.getPosition());
                    tagApplyFragment.setArguments(args);

                    fsh.pushFragment(tagApplyFragment, null);
                }
            }
    );
}

The problem is: when I click on itemView or TagImageButton, RxBus.post() is only called once (which is correct), but Action1 call() is called multiple times (not even constant times). Please see the log below.

D/**-CardContentView(31177): Apr12, item clicked.
D/**-RxBus(31177): Apr12, post event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e
D/**-CardDetailFragment(31177): Apr12, call event: com.*****.events.ItemSelectedEvent@1a11346e

How can I make it called only once?

EDIT: I found that if Action1 call() is called N times this time, it will be called N+1 times next time when I click on the item. It seems that the the observable is emitting all subsequently observed items in the history to the subscriber.


Solution

  • Finally find the solution.

    Very simple: I should have called mItemClickSubscription.unsubscribe(); and mApplyTagSubscription.unsubscribe(); in onStop().

    PublishSubject is used in the event bus. PublishSubject is a subject:

    Subject that, once an Observer has subscribed, emits all subsequently observed items to the subscriber.

    So if you don't unsubscribe() the subscription, this subscription will keep "recording" all the events happened in the history, and emit all of them once .subscribe(onNext) is executed.