首页 > Android > 教程 > 正文

基于RxJava2的RxBus,打造属于你自己的EventBus

2018-07-22 教程 136 ℃ 0 评论

前言

由于最近在学RxJava2,同样避免不了使用Rxbus,但是RxJava1到RxJava2还是有很大的变化的,例如:在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压,Consumer和BiConsumer对Action1 和Action2进行了替换,Function 替换了Func1,采用BiFunction 替换Func 2..N。并且,它们都增加了throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要try-catch了。所以我们以前的基于Rxjava1的RxBus就要升级改造下。

RxBus版本一

对于RxJava1.x.x,相信大家都已经用的很熟了,RxBus也不在话下,下面是本人在RxJava1版本使用的RxJava,代码里有很详细的注释。这里就不多说了!

  • RxBus1

public class RxBus {//    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;    
    private HashMap<String, CompositeSubscription> mSubscriptionMap;    
    /**
     *
     * 1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,
     * 需要将 Subject转换为一个 SerializedSubject,类中把线程非安全的PublishSubject包装成线程安全的Subject。
     * 2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
     *
     */
    public RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }    // 这里使用Dagger来处理单例
    /*
    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }*/

    /**
     * 发送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }    /**
     * 返回指定类型的Observable实例
     * 1、ofType操作符只发射指定类型的数据,其内部就是filter+cast
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> toObservable(final Class<T> type) {        
        return mSubject.ofType(type);
    }    /**
     * 是否已有观察者订阅
     *
     * @return
     */
    public boolean hasObservers() {        
        return mSubject.hasObservers();
    }    /**
     * 一个默认的订阅方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {        
            return toObservable(type)  // 加上背压处理,不然有些地方会有异常,关于背压参考这里:https://gold.xitu.io/post/582d413c8ac24700619cceed
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);

    }   
    
    /**
     * 保存订阅后的subscription
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {        
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();        
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {//            代表一组订阅,订阅 unsubscribed在一起
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }    
    /**
     * 取消订阅
     * @param o
     */
    public void unSubscribe(Object o) {        
        if (mSubscriptionMap == null) {            
            return;
      }
      String key = o.getClass().getName();        
      if (!mSubscriptionMap.containsKey(key)){            
          return;
      }        
      if (mSubscriptionMap.get(key) != null) {
        mSubscriptionMap.get(key).unsubscribe();
      }
      mSubscriptionMap.remove(key);
    }
}

RxBus版本二

在这之前我们先熟悉下EventBus和RxJava的原理,这里只是简单聊聊: 
EventBus 
EventBus是Android下高效的发布/订阅事件总线机制,是一种发布订阅设计模式(Publish/Subsribe),或称作观察者设计模式。 
RxJava 
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。 
明白了什么是观察者模式,我们才能更好地对RxBbus1改造成我们需要的RxBus2.

  • RxBus2 
    翻Rxjava2的文档,我们知道在RxJava2中已经没了SerializedSubject类,我们可以调用toSerialized()方法,对Subject包装序列化,使其变成线程安全的。由于Rxjava2使用Flowable支持背压,我们可以将其转换成带背压的Flowable实例。基本上bus1和bus2的区别就这些!

public class RxBus {
    private HashMap<String, CompositeDisposable> mSubscriptionMap;    
    private static volatile RxBus mRxBus;    
    private final Subject<Object> mSubject;    //单列模式
    public static RxBus getIntanceBus(){        
    if (mRxBus==null){            
        synchronized (RxBus.class){                
            if(mRxBus==null){
          mRxBus = new RxBus();
        }
       }
      }        
      return mRxBus;
    }    
    public RxBus(){
      mSubject = PublishSubject.create().toSerialized();
    }    
    public void post(Object o){
      mSubject.onNext(o);
    }    
    /**
     * 返回指定类型的带背压的Flowable实例
     *
     * @param <T>
     * @param type
     * @return
     */
    public <T>Flowable<T> getObservable(Class<T> type){        
        return mSubject.toFlowable(BackpressureStrategy.BUFFER)
                .ofType(type);
    }    
    /**
     * 一个默认的订阅方法
     *
     * @param <T>
     * @param type
     * @param next
     * @param error
     * @return
     */
    public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){        
        return getObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next,error);
    }    
    /**
     * 是否已有观察者订阅
     *
     * @return
     */
    public boolean hasObservers() {        
        return mSubject.hasObservers();
    }    
    /**
     * 保存订阅后的disposable
     * @param o
     * @param disposable
     */
    public void addSubscription(Object o, Disposable disposable) {        
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();        
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(disposable);
        } else {            //一次性容器,可以持有多个并提供 添加和移除。
            CompositeDisposable disposables = new CompositeDisposable();
            disposables.add(disposable);
            mSubscriptionMap.put(key, disposables);
        }
    }    /**
     * 取消订阅
     * @param o
     */
    public void unSubscribe(Object o) {        
        if (mSubscriptionMap == null) {            
            return;
      }

      String key = o.getClass().getName();        
      if (!mSubscriptionMap.containsKey(key)){            
          return;
      }        
      if (mSubscriptionMap.get(key) != null) {
        mSubscriptionMap.get(key).dispose();
      }

      mSubscriptionMap.remove(key);
    }
}
  • 使用

发送事件

RxBus rxBus = RxBus.getIntanceBus();//发送事件rxBus.post(new RxBusMessage("1"));
rxBus.post(new RxBusMessage("2"));

接受事件

private void initRxBus() {
        rxBus = RxBus.getIntanceBus();
        registerRxBus(RxBusMessage.class, new Consumer<RxBusMessage>() {            
                @Override
            public void accept(@NonNull RxBusMessage rxBusMessage) throws Exception {             
                    //根据事件类型进行处理
                if(TextUtils.equals(rxBusMessage.getMessage(),"1")){
                    mTvRebus.setText(rxBusMessage.getMessage());
                }else if(TextUtils.equals(rxBusMessage.getMessage(),"2")){
                    rx2.setText(rxBusMessage.getMessage());
                }
            }
        });
    }
    //注册
    public <T> void registerRxBus(Class<T> eventType, Consumer<T> action) {
        Disposable disposable = rxBus.doSubscribe(eventType, action, new Consumer<Throwable>() {            
                @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                Log.e("NewsMainPresenter", throwable.toString());
            }
        });
        rxBus.addSubscription(this,disposable);
    }

销毁事件

 public void unregisterRxBus() {
        rxBus.unSubscribe(this);
    }@Override
    protected void onDestroy() {        
            super.onDestroy();
        unregisterRxBus();
    }

效果

20170717165307651.gif

转自https://blog.csdn.net/zcpHappy/article/details/75255918

猜你喜欢

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

请填写验证码
日历
«   2018年10月   »
1234567
891011121314
15161718192021
22232425262728
293031
网站分类
最近发表
标签列表
站点信息
  • 文章总数:153
  • 页面总数:1
  • 分类总数:11
  • 标签总数:171
  • 评论总数:4
  • 浏览总数:94719