首页 > Android > 教程 > 正文

基于RxJava2的RxBus,打造属于你自己的EventBus

2018-07-22 教程 657 ℃ 0 评论

前言

由于最近在学RxJava2,同样避免不了使用Rxbus,但是RxJava1到RxJava2还是有很大的变化的,例如:在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压,Consumer和BiConsumer对Action1 和Action2进行了替换,Function 替换了Func1,采用BiFunction 替换Func 2..N。并且,它们都增加了throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要try-catch了。所以我们以前的基于Rxjava1的RxBus就要升级改造下。

RxBus版本一

对于RxJava1.x.x,相信大家都已经用的很熟了,RxBus也不在话下,下面是本人在RxJava1版本使用的RxJava,代码里有很详细的注释。这里就不多说了!

  • RxBus1

public class RxBus {//    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;    
    private HashMap<String, CompositeSubscription> mSubscriptionMap;    
    /**
     *
     * 1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,
     * 需要将 Subject转换为一个 SerializedSubject,类中把线程非安全的PublishSubject包装成线程安全的Subject。
     * 2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
     *
     */
    public RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }    // 这里使用Dagger来处理单例
    /*
    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }*/

    /**
     * 发送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }    /**
     * 返回指定类型的Observable实例
     * 1、ofType操作符只发射指定类型的数据,其内部就是filter+cast
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> toObservable(final Class<T> type) {        
        return mSubject.ofType(type);
    }    /**
     * 是否已有观察者订阅
     *
     * @return
     */
    public boolean hasObservers() {        
        return mSubject.hasObservers();
    }    /**
     * 一个默认的订阅方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {        
            return toObservable(type)  // 加上背压处理,不然有些地方会有异常,关于背压参考这里:https://gold.xitu.io/post/582d413c8ac24700619cceed
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);

    }   
    
    /**
     * 保存订阅后的subscription
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {        
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();        
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {//            代表一组订阅,订阅 unsubscribed在一起
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }    
    /**
     * 取消订阅
     * @param o
     */
    public void unSubscribe(Object o) {        
        if (mSubscriptionMap == null) {            
            return;
      }
      String key = o.getClass().getName();        
      if (!mSubscriptionMap.containsKey(key)){            
          return;
      }        
      if (mSubscriptionMap.get(key) != null) {
        mSubscriptionMap.get(key).unsubscribe();
      }
      mSubscriptionMap.remove(key);
    }
}

RxBus版本二

在这之前我们先熟悉下EventBus和RxJava的原理,这里只是简单聊聊: 
EventBus 
EventBus是Android下高效的发布/订阅事件总线机制,是一种发布