RxBus



public class RxBus {
   
    private static volatile RxBus defaultInstance;

    private final Subject<Object, Object> bus;
    

    public RxBus() {
      bus = new SerializedSubject<>(PublishSubject.create());
    }
    
    // 单例RxBus
    public static RxBus getDefault() {
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new RxBus();
                }
            }
        }
        return defaultInstance ;
    }
    
    // 发送一个新的事件
    public void post (Object o) {
        bus.onNext(o);
    }
    
    // 根据传递的EventType返回特定类型的被观察者
    public <T> Observable<T> toObservable (Class<T> eventType) {
        return bus.ofType(eventType);
    }
}
   
       
               
//使用
//设置观察者
rxSubscription = RxBus.getDefault()
        .toObserverable(Event.class)
        .subscribe( new Action1<Event>() {
                            @Override
                            public void call(Event userEvent) {
                            long id = userEvent.getId();
                            String name = userEvent.getName();
                            ...
                            }
                        },
                    new Action1<Throwable>() {
                             @Override
                             public void call(Throwable throwable) {
                             
                             }        
                        }
         );
         
//发射数据
RxBus.getDefault().post(new Event ());

//取消订阅事件
@Override
protected void onDestroy() {
    super.onDestroy();
    if(!rxSubscription.isUnsubscribed()) {
        rxSubscription.unsubscribe();
    }
}