由于最近在学RxJava2,同样避免不了使用Rxbus,但是RxJava1到RxJava2还是有很大的变化的,例如:在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压,Consumer和BiConsumer对Action1 和Action2进行了替换,Function 替换了Func1,采用BiFunction 替换Func 2..N。并且,它们都增加了throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要try-catch了。所以我们以前的基于Rxjava1的RxBus就要升级改造下。
对于RxJava1.x.x,相信大家都已经用的很熟了,RxBus也不在话下,下面是本人在RxJava1版本使用的RxJava,代码里有很详细的注释。这里就不多说了!
RxBus1
public class RxBus {// private static volatile RxBus mInstance; private SerializedSubject<Object, Object> mSubject; private HashMap<String, CompositeSubscription> mSubscriptionMap; /** * * 1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题, * 需要将 Subject转换为一个 SerializedSubject,类中把线程非安全的PublishSubject包装成线程安全的Subject。 * 2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。 * */ public RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); } // 这里使用Dagger来处理单例 /* private RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); } public static RxBus getInstance() { if (mInstance == null) { synchronized (RxBus.class) { if (mInstance == null) { mInstance = new RxBus(); } } } return mInstance; }*/ /** * 发送事件 * * @param o */ public void post(Object o) { mSubject.onNext(o); } /** * 返回指定类型的Observable实例 * 1、ofType操作符只发射指定类型的数据,其内部就是filter+cast * @param type * @param <T> * @return */ public <T> Observable<T> toObservable(final Class<T> type) { return mSubject.ofType(type); } /** * 是否已有观察者订阅 * * @return */ public boolean hasObservers() { return mSubject.hasObservers(); } /** * 一个默认的订阅方法 * * @param type * @param next * @param error * @param <T> * @return */ public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) { return toObservable(type) // 加上背压处理,不然有些地方会有异常,关于背压参考这里:https://gold.xitu.io/post/582d413c8ac24700619cceed .onBackpressureBuffer() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(next, error); } /** * 保存订阅后的subscription * @param o * @param subscription */ public void addSubscription(Object o, Subscription subscription) { if (mSubscriptionMap == null) { mSubscriptionMap = new HashMap<>(); } String key = o.getClass().getName(); if (mSubscriptionMap.get(key) != null) { mSubscriptionMap.get(key).add(subscription); } else {// 代表一组订阅,订阅 unsubscribed在一起 CompositeSubscription compositeSubscription = new CompositeSubscription(); compositeSubscription.add(subscription); mSubscriptionMap.put(key, compositeSubscription); } } /** * 取消订阅 * @param o */ public void unSubscribe(Object o) { if (mSubscriptionMap == null) { return; } String key = o.getClass().getName(); if (!mSubscriptionMap.containsKey(key)){ return; } if (mSubscriptionMap.get(key) != null) { mSubscriptionMap.get(key).unsubscribe(); } mSubscriptionMap.remove(key); } }
在这之前我们先熟悉下EventBus和RxJava的原理,这里只是简单聊聊:
EventBus
EventBus是Android下高效的发布/订阅事件总线机制,是一种发布订阅设计模式(Publish/Subsribe),或称作观察者设计模式。
RxJava
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
明白了什么是观察者模式,我们才能更好地对RxBbus1改造成我们需要的RxBus2.
RxBus2
翻Rxjava2的文档,我们知道在RxJava2中已经没了SerializedSubject类,我们可以调用toSerialized()方法,对Subject包装序列化,使其变成线程安全的。由于Rxjava2使用Flowable支持背压,我们可以将其转换成带背压的Flowable实例。基本上bus1和bus2的区别就这些!
public class RxBus { private HashMap<String, CompositeDisposable> mSubscriptionMap; private static volatile RxBus mRxBus; private final Subject<Object> mSubject; //单列模式 public static RxBus getIntanceBus(){ if (mRxBus==null){ synchronized (RxBus.class){ if(mRxBus==null){ mRxBus = new RxBus(); } } } return mRxBus; } public RxBus(){ mSubject = PublishSubject.create().toSerialized(); } public void post(Object o){ mSubject.onNext(o); } /** * 返回指定类型的带背压的Flowable实例 * * @param <T> * @param type * @return */ public <T>Flowable<T> getObservable(Class<T> type){ return mSubject.toFlowable(BackpressureStrategy.BUFFER) .ofType(type); } /** * 一个默认的订阅方法 * * @param <T> * @param type * @param next * @param error * @return */ public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){ return getObservable(type) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(next,error); } /** * 是否已有观察者订阅 * * @return */ public boolean hasObservers() { return mSubject.hasObservers(); } /** * 保存订阅后的disposable * @param o * @param disposable */ public void addSubscription(Object o, Disposable disposable) { if (mSubscriptionMap == null) { mSubscriptionMap = new HashMap<>(); } String key = o.getClass().getName(); if (mSubscriptionMap.get(key) != null) { mSubscriptionMap.get(key).add(disposable); } else { //一次性容器,可以持有多个并提供 添加和移除。 CompositeDisposable disposables = new CompositeDisposable(); disposables.add(disposable); mSubscriptionMap.put(key, disposables); } } /** * 取消订阅 * @param o */ public void unSubscribe(Object o) { if (mSubscriptionMap == null) { return; } String key = o.getClass().getName(); if (!mSubscriptionMap.containsKey(key)){ return; } if (mSubscriptionMap.get(key) != null) { mSubscriptionMap.get(key).dispose(); } mSubscriptionMap.remove(key); } }
使用
发送事件
RxBus rxBus = RxBus.getIntanceBus();//发送事件rxBus.post(new RxBusMessage("1")); rxBus.post(new RxBusMessage("2"));
接受事件
private void initRxBus() { rxBus = RxBus.getIntanceBus(); registerRxBus(RxBusMessage.class, new Consumer<RxBusMessage>() { @Override public void accept(@NonNull RxBusMessage rxBusMessage) throws Exception { //根据事件类型进行处理 if(TextUtils.equals(rxBusMessage.getMessage(),"1")){ mTvRebus.setText(rxBusMessage.getMessage()); }else if(TextUtils.equals(rxBusMessage.getMessage(),"2")){ rx2.setText(rxBusMessage.getMessage()); } } }); } //注册 public <T> void registerRxBus(Class<T> eventType, Consumer<T> action) { Disposable disposable = rxBus.doSubscribe(eventType, action, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e("NewsMainPresenter", throwable.toString()); } }); rxBus.addSubscription(this,disposable); }
销毁事件
public void unregisterRxBus() { rxBus.unSubscribe(this); }@Override protected void onDestroy() { super.onDestroy(); unregisterRxBus(); }
效果
转自https://blog.csdn.net/zcpHappy/article/details/75255918