qiuyadong's Homepage

RxJava官网学习

2020-07-21

普天皆灭焰,匝地尽藏烟。不知何处火,来就客心然。

简单理解

RxJava概念

其实就是操作一个生产线上的产品,当产品到了不同的包装环节,都有操作员对其进行查看和包装,不同的制作环节将引起不同的工作流程;

它扩展了观察者模式来支持数据/事件序列,并添加了运算符,使开发者可以声明性地组合序列,同时抽象出对低级线程、同步、线程安全性和并发数据结构等事物的观察。

RxJava 1.x 基本用法

  • RxJava 1.x

      //创建了一个Observable对象,传入的参数是OnSubscribe接口,故而在参数内实现其接口的call 方法
      Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                 //在创建Observable中实现OnSubscribe接口的call 方法中,注意其参数为Subscriber订阅者,调用订阅者的onNext方法传入数据,再调用onCompleted 方法。
                  @Override
                  public void call(Subscriber<? super String> subscriber) {
                      if (!subscriber.isUnsubscribed()) {
                          subscriber.onNext("test");
                          subscriber.onCompleted();
                      }
                  }
              });
      //调用Observable对象的订阅subscribe 事件方法,其参数传入Observer接口,故而在参数内实现其接口的onCompleted()、 onError(Throwable e)、onNext(T t)方法。
              Subscription subscription = observable.subscribe(new Observer<String>() {
                  @Override
                  public void onCompleted() {
                      System.out.println("onCompleted");
                  }
                  @Override
                  public void onError(Throwable e) {
                  }
                  @Override
                  public void onNext(String s) {
                      System.out.println("onNext:" + s);
                  }
              });

  • Subscription接口

    Subscription是Observable调用subscribe订阅事件方法后返回的一个接口,其内容也很简单,两个方法,一个解除订阅的unsubscribe()方法,一个是判断是否解除的isUnsubscribed() 方法;


        public interface Subscription {
             void unsubscribe();
             boolean isUnsubscribed();
        }

  • Observer 接口

    Observer是Observable调用subscribe订阅事件方法中传入的参数,也是一个接口,三个待实现的方法,分别是回调完成方法onCompleted()、 错误处理方法onError(Throwable e)、数据接收方法onNext(T t)。


        public interface Observer<T> {
            void onCompleted();
            void onError(Throwable e);
             void onNext(T t);
        }

  • Subscriber 抽象类

    在创建Observable时,需要给create传入参数Observable.OnSubscribe接口,并实现其接口的call方法,此方法的参数类型就是Subscriber,而开发者也是在此方法中调用onNext、onComplete,因此可以推测Subscriber必然实现了Observer 接口。 仔细查看源码,确实如此,它还实现了Subscription 接口中的unsubscribe()、isUnsubscribed()两个方法,简单做了封装,但并未详细实现Observer 接口中的三个方法,因此只是一个抽象类。


        public abstract class Subscriber<T> implements Observer<T>, Subscription {
        }

  • OnSubscriber 接口

    OnSubscriber是在创建Observable时传入create 方法中的参数类型,也是一个接口。此接口又是继承于Action1 接口,Action1 接口中有一个未实现的call方法,而Action1 接口又继承于Action接口,Action接口是一个空实现,最后它又继承于Function接口,Function接口也是一个空实现。


        OnSubscriber -> Action1 -> Action -> Fcuntion

  • Observable 工具类

    • create方法

      首先查看Observable的静态创建create方法,可见其只是多做了一层封装,new这个对象时,将参数onSubscribe传入构造方法中,而RxJavaHooks.onCreate(f)也只是多做了一个判断,最终返回的还是onSubscribe。


          public static <T> Observable<T> create(OnSubscribe<T> f) {
                  return new Observable<T>(RxJavaHooks.onCreate(f));
              }

- subscribe方法

          public final Subscription subscribe(final Observer<? super T> observer) {
                  if (observer instanceof Subscriber) {
                      return subscribe((Subscriber<? super T>)observer);
                  }
                  if (observer == null) {
                      throw new NullPointerException("observer is null");
                  }
                  return subscribe(new ObserverSubscriber<T>(observer));
              }

  这里使用Subscriber将我们传入的observer接口做了一层简单的封装,来查看ObserverSubscriber的具体实现:

          public final class ObserverSubscriber<T> extends Subscriber<T> {
              final Observer<? super T> observer;

              public ObserverSubscriber(Observer<? super T> observer) {
                  this.observer = observer;
              }

              @Override
              public void onNext(T t) {
                  observer.onNext(t);
              }

              @Override
              public void onError(Throwable e) {
                  observer.onError(e);
              }

              @Override
              public void onCompleted() {
                  observer.onCompleted();
              }
          }

  这里使用Subscriber将我们传入的observer接口做了一层简单的封装。还是回到它重载的另一个subscribe 方法:

          public final Subscription subscribe(Subscriber<? super T> subscriber) {
                  return Observable.subscribe(subscriber, this);
              }

由以上代码可知,从调用的参数为observer接口的subscribe 方法内做了一层封装,调用了参数为subscriber抽象类的subscribe 方法,最终调用的是参数为subscriber、observable的静态subscribe 方法。

           RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
           return RxJavaHooks.onObservableReturn(subscriber);

  第一行代码调用OnSubscriber接口的call方法,这意味着我们创建Observable时实现的call方法回被调用,那么call方法中对数据传递、标志完成的操作会执行,即实现的Observer接口中的onNext方法中接收到数据,onComplete()方法也被调用。最后返回Subscription。

RxJava 1.x 理解

  • Observable(被观察者)

    • 通过Observable创建一个可观察序列

    • 通过subscribe去注册一个观察者

  • Observer(观察者)

    • 用于接收数据的观察者

    • 作为Observable的subscribe方法的参数

  • Subscription(订阅者)

    • 订阅,用于描述观察者和被观察者之间的关系

    • 用于取消订阅和获取当前订阅的状态

  • OnSubscribe

    • 当订阅时会发送此接口的调用

    • 在Observable内部,实际作用是向订阅者发布数据

  • Subscriber

    • 实现了Observer和Subscription接口

RxJava 2.x基本用法

  • Rx2.x

       Observable.create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> e) throws Exception {
                              if (!e.isDisposed()) {
                                  e.onNext("test");
                                  e.onComplete();
                              }
                          }
                      }).subscribe(new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable d) {
                              System.out.println("onSubscribe");
                          }
                          @Override
                          public void onNext(String value) {
                              System.out.println("onNext:" + value);
                          }
                          @Override
                          public void onError(Throwable e) {

                          }
                          @Override
                          public void onComplete() {
                              System.out.println("onCompleted");
                          }
                      });

  • 在通过调用Observable的create() 方法创建对象时,传入的参数是ObservableOnSubscribe接口,实现的是其接口的subscribe 方法,方法内的参数是ObservableEmitter,不再是1.x版本的OnSubscribe接口(call 方法)。

  • 在调用Observable对象的订阅subscribe 事件方法,其参数传入的Observer接口,多了一个需要实现onSubscribe(Disposable d)方法,方法内的参数是Disposable。

  • 与RxJava 1.x区别

    • Observer接口

      多了一个void onSubscribe(Disposable d);方法,用于观察者取消事件订阅,来查看Disposable接口组成:(注意:2.x版本新增的Disposable可以做到切断订阅事件的操作,让观察者Observer不再接收上游事件,避免内存泄漏)


        public interface Disposable {
            void dispose();
            boolean isDisposed();
        }

接口中两个方法,一个dispose方法,另一个事检测是否dispose方法,其结构与Subscription类似。
  • ObservableOnSubscribe接口

    是创建Observable时传入的接口参数,在2.x版本中单独独立出来了。为观察者提供了取消订阅连接的功能,该接口中的subscribe方法用于接收ObservableEmitter的实例,该实例允许用安全的方式取消事件。


        public interface ObservableOnSubscribe<T> {
            void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
        }

  • ObservableEmitter接口

    前两个接口我们一直都在强调2.x版本新增的Disposable切断订阅事件,使得观察者不再接收上游事件的功能,可预先此接口也是为它所用,作用是设置Emitter的disposable和cancellable


        public interface ObservableEmitter<T> extends Emitter<T> {
             void setDisposable(@Nullable Disposable d);
            void setCancellable(@Nullable Cancellable c);
             boolean isDisposed();
             boolean tryOnError(@NonNull Throwable t);
        }

继续查看Emitter接口的组成,会发现其中包含的三个方法竟然与Observer接口完全相同,其中缘由后续讲解。

        public interface Emitter<T> {
            void onNext(@NonNull T value);
            void onError(@NonNull Throwable error);
             void onComplete();
        }

  • Observable类

    • create方法

            public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
                  ObjectHelper.requireNonNull(source, "source is null");
                  return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
              }

  此方法中可以得出两个信息,第一个是调用了RxJavaPlugins的静态onAssembly方法,第二个是传入此方法的参数,将ObservableOnSubscribe接口通过ObservableCreate做了一次封装。首先来了解onAssembly方法:

           public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
                  Function<? super Observable, ? extends Observable> f = onObservableAssembly;
                  if (f != null) {
                      return apply(f, source);
                  }
                  return source;
              }

  此方法中的一个关键成员变量onObservableAssembly,它最初被赋值为null,为外界提供了set方法,因此当我们刚开始调用时f 被判断为null,直接将source返回。再来查看new ObservableCreate<T>(source) 具体构成:

          public final class ObservableCreate<T> extends Observable<T> {
              final ObservableOnSubscribe<T> source;

              public ObservableCreate(ObservableOnSubscribe<T> source) {
                  this.source = source;
              }

              @Override
              protected void subscribeActual(Observer<? super T> observer) {
                  CreateEmitter<T> parent = new CreateEmitter<T>(observer);
                  observer.onSubscribe(parent);

                  try {
                      source.subscribe(parent);
                  } catch (Throwable ex) {
                      Exceptions.throwIfFatal(ex);
                      parent.onError(ex);
                  }
              }
          }

- subscribe方法

  回到Observable,查看subscribe方法:

          public final void subscribe(Observer<? super T> observer) {     
                      observer = RxJavaPlugins.onSubscribe(this, observer);
                      subscribeActual(observer);
          }

  首先查看到observer = RxJavaPlugins.onSubscribe(this, observer);,在Observable的create中也出现了RxJavaPlugins相关用法,而此处它的作用也是类似,就是将传入的参数observer返回,重点在于后面的subscribeActual(observer);,也就是刚介绍ObservableCreate实现的subscribeActual 方法。

- 总结

  到了这里,必须强调了一下,再次回顾RxJava 1.x版本中Observable的subscribe处理,通过调用创建Observable传入的OnSubscribe接口的call方法正式触发订阅事件,后续Observe接口中onNext、onComplete方法才回被调用。而RxJava 2.x版本中的处理亦如是,只不过OnSubscribe接口换成了ObservableOnSubscribe接口,call方法换成了subscribe方法,参数由subscriber更换成了ObservableEmitter,这些变换也是RxJava 2.x的改进,新增的Disposable切断订阅事件,使得观察者不再接收上游事件的功能,来避免内存泄漏。由此可见以上处理过程,RxJava 2.x 与 1.x的区别不大。

RxJava 2.x理解

在以上基本元素的对比上,两个版本其实没有很大的区别,只是部分写法做了改变,2.x版本中将精华ObservableCreate独立出来,但其核心内容还是与1.x 相同,调用传进来的OnSubscribe接口中的subscribe(call)方法,并传入observe接口到其中。而2.x中还特地使用了CreateEmitter对传入的observe接口做了包装,因此我们手动调用onNext方法时,实际上就是通过observe接口调用这些方法。

  • Observable

    被观察者,不支持背压

    可通过Observable创建一个可观察到序列

    通过subscribe去注册一个观察者

  • Observer

    用于接收数据的观察者

    作为Observable的subscribe方法的参数

  • Disposable

    和RxJava1的Subscription作用相当

    用于取消订阅和获取当前的订阅状态

  • ObservableOnSubscriber

    当订阅时会出发此接口的调用

    在Observable内部,实际作用时向订阅者发布数据

  • Emitter

    一个发射数据的接口,和Observer 的方法类似

    本质是对Observer和Subscribe的包装

被压

  • 概念

    异步环境下产生的问题:同步环境下会等待一件事处理完后再进行下一步,而异步环境下是处理完一件事,未等它得出结果接着处理下一步,在获得结果之后进行回调,再处理结果。

    发送和处理速度不统一:例如生产者生产出的产品放置到缓存队列中,供消费者消费。若生产者生产的速度大于消费者消耗的速度,则会出现缓存队列溢出的问题。

    是一种流速控制及解决策略:例如背压中的丢弃策略,一旦发现缓存队列已满,为了整个过程顺利进行,则会丢弃最新产生的产品,避免溢出,因此背压也是一种流速控制的解决策略

  • 案例


       Flowable.create(new FlowableOnSubscribe<String>() {
                  @Override
                  public void subscribe(FlowableEmitter<String> e) throws Exception {
                      if (!e.isCancelled()) {
                          e.onNext("Flowable test");
                          e.onComplete();
                      }
                  }
              }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
                  @Override
                  public void onSubscribe(Subscription s) {
                      s.request(Long.MAX_VALUE);
                      System.out.println("onSubscribe");
                  }
                  @Override
                  public void onNext(String s) {
                      System.out.println("onNext:" + s);
                  }
                  @Override
                  public void onError(Throwable t) {
                  }
                  @Override
                  public void onComplete() {
                      System.out.println("onCompleted");
                  }
              });

  • 首先创建通过调用Flowable的create方法创建实例,传入两个参数,第一个是OnSubscribe接口实例,并实现其subscribe方法,第二参数则是背压特有的背压策略;

  • 调用Flowable的subscribe方法,你会发现不同于之前,传入的并非是observer接口,而是Subscriber接口,实现onSubscribe、onNext、onComplete、onError方法。

RxJava 3.0官方文档

概念

  • RxJava

    jvm响应式编程的扩展:就是一个lib包通过使用观察序列实现同步、基于事件的编程方式; 它扩展了观察者模式以支持数据/事件序列,并添加了操作符,允许您以声明的方式将序列组合在一起,同时抽象出对底层线程、同步、线程安全和并发数据结构等问题的关注。

  • 基础类

    • Flowable

      0 . .N流,支持反应流和被压

    • Observable

      0 . .N流,不支持被压

    • Single

      一个流,期望是一项或者一个错误

    • Completable

      一个流没有项但是是一个完成了的或者一个错误标示

    • Maybe

      一个流没有项,期望是一项或者一个错误

术语

  • Upstream(上游)、downstream(下游)

    RxJava中的数据流由一个源、零个或更多中间步骤组成,后面跟着一个数据消费者或组合子步骤(其中步骤负责通过某种方式消耗数据流)


      source.operator1().operator2().operator3().subscribe(consumer);

      source.flatMap(value -> source.operator1().operator2().operator3());

如果自己是operator2,那么operator1是上游,operator3就是下游;

  • Objects in motion(运行对象)

    emission, emits, item, event, signal,data和 message在Rxjava中被认为是同义词,代表对象在跟随数据量而运行;

  • Backpressure(被压)

    当数据流通过异步步骤运行时,每一步可能以不同的速度执行不同的事情。

    这类步骤通常会由于临时缓冲或需要跳过/删除数据而导致内存使用量增加,为了避免这些步骤过于庞大,应用了所谓的backpressure,这是一种流控制形式,其中步骤可以表示它们准备处理多少项。这允许在某些情况下限制数据流的内存使用,通常情况下,一个步骤无法知道上游将发送多少项给它。

    在RxJava中,专用的流型类被指定为支持背压操作,而Observable类被指定为非背压操作(短序列、GUI交互等)。其他类型,Single, Maybe和Completable不支持背压,也不应该支持;总有地方可以暂时存放一些对象。

  • Assembly time(装配时)

    使用各种中间操作符进行数据流的准备发生在所谓的装配时间:

  • Subscription time(订阅时)

    在内部建立处理步骤链的流程上调用subscribe()时的临时状态

  • Runtime(运行时)

    这是流主动发出项、错误或完成信号时的状态

  • Simple background computation(简单后台计算)

    RxJava的一个常见用例是在后台线程上运行一些计算、网络请求,并在UI线程上显示结果(或错误);


      Flowable<String> source = Flowable.fromCallable(() -> {
          Thread.sleep(1000); //  imitate expensive computation
          return "Done";
      });

      Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

      Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

      showForeground.subscribe(System.out::println, Throwable::printStackTrace);

      Thread.sleep(2000);

通常,您可以通过subscribeOn将计算或阻塞IO移动到其他线程。一旦数据准备好了,就可以确保通过observeOn在前台或GUI线程上处理它们。

  • Schedulers(调度者)

    RxJava操作符并不直接与线程或ExecutorServices一起工作,而是与所谓的调度器一起工作,这些调度器抽象出了一个统一API背后的并发源。RxJava 3提供了几个可通过调度器实用程序类访问的标准调度器。

    • Schedulers.computation() 在后台使用固定数量的专用线程运行密集的计算工作。大多数异步操作符将此作为默认调度程序。

    • Schedulers.io() 对一组动态变化的线程运行类I/O或阻塞操作。

    • Schedulers.single() 以顺序和FIFO的方式在单个线程上运行工作。

    • Schedulers.trampoline() 通常为了测试的目的,在其中一个参与线程中按顺序和FIFO的方式运行工作。

    此外,还有一个选项可以通过schedul. from(Executor)将现有的执行程序(及其子类型,如ExecutorService)包装到调度程序中。例如,可以使用它来拥有更大但仍然固定的线程池(分别与compute()和io()不同)。

  • Concurrency within a flow(流的并发性)

    RxJava中的流在本质上是顺序的,被分割为可以彼此并发运行的处理阶段:


      Flowable.range(1, 10)
        .observeOn(Schedulers.computation())
        .map(v -> v * v)
        .blockingSubscribe(System.out::println);

示例流在计算调度程序中将数字从1平方到10,并在“主”线程(更准确地说,是blockingSubscribe的调用线程)上消耗结果。但是,lambda v -> v * v并不在此流中并行运行;它在同一个计算线程上一个接一个地接收值1到10。

  • Parallel processing(并行处理)

      Flowable.range(1, 10)
        .flatMap(v ->
            Flowable.just(v)
              .subscribeOn(Schedulers.computation())
              .map(w -> w * w)
        )
        .blockingSubscribe(System.out::println);

实际上,RxJava中的并行性意味着运行独立的流并将它们的结果合并回单个流。操作符flatMap首先将从1到10的每个数字映射到它自己的流动,运行它们并合并计算得到的结果。

flatMap不保证任何顺序,并且来自内部流的项可能最终交错。有可供选择的操作符:

  • concatMap

    映射并每次运行一个内部流

  • concatMapEager

    它“立即”运行所有内部流,但输出流将按照创建这些内部流的顺序。 或者,Flowable.parallel()操作符和ParallelFlowable类型帮助实现相同的并行处理模式:


      Flowable.range(1, 10)
        .parallel()
        .runOn(Schedulers.computation())
        .map(v -> v * v)
        .sequential()
        .blockingSubscribe(System.out::println);

  • Dependent sub-flows(依赖子流)

    flatMap是一个强大的运算符,在很多情况下都有帮助。例如,给定一个返回Flowable的服务,我们想用第一个服务发出的值调用另一个服务:


      Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

      inventorySource
          .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
                  .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
          .subscribe(System.out::println);

  • Continuations(延续)

    有时,当一个项可用时,人们希望对它执行一些依赖的计算。这有时被称为延续,根据应该发生什么以及所涉及的类型,可能会涉及到各种操作符来完成。

    • Dependent

      最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:


        service.apiCall()
        .flatMap(value -> service.anotherApiCall(value))
        .flatMap(next -> service.finalCall(next))

通常情况下,后面的序列会需要来自以前映射的值。这可以通过移动外部的平面到之前的平面的内部来实现,例如:

        service.apiCall()
        .flatMap(value ->
            service.anotherApiCall(value)
            .flatMap(next -> service.finalCallBoth(value, next))
        )

在这里,由lambda变量章节提供的原始值将在内部平面映射中可用。
  • Non-dependent

    在其他情况下,第一个源/数据流的结果是不相关的,人们希望继续使用准独立的另一个源。在这里,flatMap也可以工作:


        Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
        continued.map(v -> v.toString())
          .subscribe(System.out::println, Throwable::printStackTrace);

然而,在这种情况下,延续保持可观察,而不是可能更合适的单个。(因为从flatMapSingle的角度来看,sourceObservable是一个多值源,因此映射也可能导致多个值)。

通常有一种更有表现力(也更低开销)的方法,使用Completable作为中介和它的操作符,然后继续使用其他东西:

        sourceObservable
          .ignoreElements()           // returns Completable
          .andThen(someSingleSource)
          .map(v -> v.toString())

sourceObservable和someSingleSource之间唯一的依赖关系是,前者应该正常完成,以便后者被消费。   - Deferred-dependent
有时,前一个序列和新序列之间存在隐式的数据依赖关系,由于某种原因,这些依赖关系没有通过“常规通道”传递。人们倾向于将这种延续写如下:

        AtomicInteger count = new AtomicInteger();

        Observable.range(1, 10)
          .doOnNext(ignored -> count.incrementAndGet())
          .ignoreElements()
          .andThen(Single.just(count.get()))
          .subscribe(System.out::println);

不幸的是,这会输出0,因为Single.just(count.get())是在数据流尚未运行时计算的。我们需要一些东西来推迟这个单一源的评估,直到运行时,当主源完成:

        AtomicInteger count = new AtomicInteger();

        Observable.range(1, 10)
          .doOnNext(ignored -> count.incrementAndGet())
          .ignoreElements()
          .andThen(Single.defer(() -> Single.just(count.get())))
          .subscribe(System.out::println);

参考文档:

1:https://github.com/ReactiveX/RxJava

2:https://blog.csdn.net/itermeng/article/details/80139074



Comments