RxJava 2.x 学习笔记

RxJava 学习笔记

观察者、被观察者

Observable、Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 1. 创建上游(被观察者)
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onError();
emitter.onComplete();
}
});

// 2. 创建下游(观察者)
Observer observer = new Observer<Integer>() {
Disposable disposable = null;

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
disposable = d;
}

@Override
public void onNext(Integer o) {
log.d(TAG, "onNext: " + o)
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};

// 3. 建立连接(订阅)
observable.subscribe(observer);

// 4. 打印结果:
onSubscribe:
onNext: 1
onComplete:

ObservableEmitter

  • ObservableEmitter:可理解为发射器,可发射3种类型的事件,即调用onNext()、onComplete()、onError() 方法。
  • 上游可以发送多个 onNext() 和 onComplete(),只能发送一个 onError();可以不发送 onComplete() 和 onError()。
  • 上游发送 onComplete() 或者 onError() 后会继续发送其他事件,但是下游接收到 onComplete() 或者 onError() 事件之后不再接收其他事件。
  • 上游可以先发送 onError() 再发送 onComplete(),不能先发送 onComplete() 再发送 onError()。

Disposable

  • 调用 dispose() 方法可以切断上下游之间的连接,上游可以继续发送除 onError() 之外的事件,但是下游不再接收事件。
  • 可通过两种方式获取 Disposable 对象,分别为:
    • 在 onSubscribe() 回调方法的参数中获取;
    • 某些重载的订阅方法 subscribe() 返回值是 Disposable 对象:
      1
      2
      3
      4
      5
      6
      public final Disposable subscribe() {}
      public final Disposable subscribe(Consumer<? super T> onNext) {}
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
      public final void subscribe(Observer<? super T> observer) {}

Flowable、Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 1. 创建上游
Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
// emitter.onError();
}
}, BackpressureStrategy.ERROR);

// 2. 创建下游
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe: ");
s.request(1);
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};

// 3. 建立连接(订阅)
flowable.subscribe(subscriber);

BackpressureStrategy

  • 表示背压策略,有以下几种:
    • MISSING:onNext() 事件没有任何缓存和丢弃,下游要处理溢出
    • ERROR:缓存区默认为 128,当上游发送事件的速度太快而下游处理不过来时会抛出 MissingBackpressureException
    • BUFFER:缓存区大小无限制,使用不当会 OOM
    • DROP:缓存最近的 onNext() 事件
    • LATEST:缓存区会保留最后一个 onNext() 事件

Subscription

  • 和 Disposable 类似,调用 cancel() 表示请求停止发送事件,可切断上下游的连接
  • 必须显示调用 request(long n),表示下游可以处理 n 个事件

FlowableEmitter

  • 继承自 Emitter,即 onNext()、onError()、onComplete()
  • requested():表示下游的处理能力,即下游 s.request() 的大小
  • isCancelled():下游是否请求停止发送,即 s.cancel()

Single、SingleObserver

  • 上游可以发送多个事件,但是下游只能接收到一个事件
  • onSuccess() 和 onError() 是互斥
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    // 1. 创建上游
    Single<Integer> single = Single.create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> e) throws Exception {
    for (int i = 0; i < 3; i++) {
    e.onSuccess(i);
    Log.d(TAG, "subscribe: " + i);
    }
    }
    });

    // 2. 创建下游
    SingleObserver<Integer> observer = new SingleObserver<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onSuccess(Integer integer) {
    // 相当于 onNext() 和 onComplete()
    Log.d(TAG, "onSuccess: " + integer);
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError: ");
    }
    };

    // 3. 建立连接(订阅)
    single.subscribe(observer);

    // 4. 打印结果:
    onSubscribe:
    onSuccess: 0
    subscribe: 0
    subscribe: 1
    subscribe: 2

SingleEmitter

  • 是个接口,onSuccess()、onError()

Completable、CompletableObserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 1. 创建上游
Completable completable = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onComplete();
Log.d(TAG, "subscribe: " + i);
}
}
});

// 2. 创建下游
CompletableObserver observer = new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
};

// 3. 建立连接(订阅)
completable.subscribe(observer);

// 4. 打印结果:
onSubscribe:
onComplete:
subscribe: 0
subscribe: 1
subscribe: 2
  • 上游可以发送多个事件,但是下游只能接收到一个事件
  • onComplete() 和 onError() 是互斥

Maybe、MaybeObserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 1. 创建上游
Maybe<Integer> maybe = Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onSuccess(i);
// 不会起作用
e.onComplete();
}
}
});

// 2. 创建下游
MaybeObserver<Integer> observer = new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onSuccess(Integer integer) {
Log.d(TAG, "onSuccess: " + integer);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};

// 3. 建立连接(订阅)
maybe.subscribe(observer);

// 4. 打印结果:
onSubscribe:
onSuccess: 0
  • 类似于 Single 和 Completable 的混合体,onSuccess()、onComplete()、onError() 三者互斥

线程调度

subscribeOn()

  • 表示上游发送事件的线程
  • 有多个 subscribeOn() 时,上游只会在第一个 subscribeOn() 表示的线程发送事件

observeOn()

  • 表示下游接收事件的线程
  • 有多个 observeOn() 时,下游只会在最后一个 observeOn() 表示的线程接收事件

Scheduler

  • Schedulers.immediate(): 默认的 Scheduler,直接在当前线程运行。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread(): Android 专用的,它指定的操作将在 Android 主线程运行。

背压策略

ERROR

  • error:当下游没有请求数据时,上游最多只能发送128个事件,多于 128 时将会调用 onError() 抛出 MissingBackpressureException 异常;当上下游流速均衡(即上游发送数据和下游处理数据的速度相同)时,上游可以发送无限数据,不会出现 OOM
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 ErrorAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的 onNext() 方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 ErrorAsyncEmitter 的实现方法 onOverflow()onOverflow() 方法里面调用 onError() 方法抛出 MissingBackpressureException 异常
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync() 方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;,然后再请求上游发送 96 个数据到下游的缓存队列(此时如果上游继续发送数据(e.onNext(i)),由于 AtomicLong 大于 0 会继续发送数据到下游缓存队列,否则就不发送数据到下游),与此同时下游继续从缓存队列取数据发送出去,发送一个数据就 e++,直到 while(e != r) 不成立导致不再发送给外界。此时如果外界主动调用 s.request(n) 请求数据将继续发送数据给外界
  • 上游发送完全部数据之前,如果上游发送过的所有数据比下游请求过的所有数据 >= 96 时抛出 MissingBackpressureException 异常。因为每次下游都是请求 96 个数据,96 保存在上游的 AtomicLong 中,发送一个数据就减 1,当 AtomicLong 为 0 时就抛出 MissingBackpressureException 异常

BUFFER

  • buffer: 上游可以发送无限个数据,不会出现 MissingBackpressureException 异常,但是会 OOM
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 BufferAsyncEmitter 的 onNext() 方法中先用 queue.offer(t) 保存发送过的所有数据,然后再调用 drain() 方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync() 方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++,这时发送了 4 个数据后 while(e != r) 不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request() 方法设置 AtomicLong 的值为 96,再去调用 BufferAsyncEmitter 实现的 onRequested() 方法,onRequested() 中再调用 drain() 方法完成数据的发送。drain() 方法会从 queue 中取出未发送过的数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request() 方法中去调用空方法 onRequested() ,而 onNext() 方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界

DROP

  • drop: 上游可发送无限个数据
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 DropAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的 onNext() 方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 onOverflow()onOverflow() 是个空方法,也就是丢弃数据
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync() 方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++,这时发送了 4 个数据后 while(e != r) 不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request() 方法中去调用空方法 onRequested() ,而 onNext() 方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界

LATEST

  • latest: 上游可以发送无限个数据
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 LatestAsyncEmitter 的 onNext() 方法中先用 queue(AtomicReference对象) 保存当前发送的数据,所以发送完所有数据后 queue 保存的是最后一个数据,然后再调用 drain() 方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync() 方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++,这时发送了 4 个数据后 while(e != r) 不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request() 方法中去调用 LatestAsyncEmitter 实现的 onRequested() 方法,onRequested() 中再调用 drain() 方法完成数据的发送。drain() 方法会把 queue 中保存的最后一个数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界,所以 LATEST 策略总是可以请求到上游的最后一个数据

MISSING

  • missing: 上游没有背压策略,需要下游通过背压操作符(onBackpressureBuffer()onBackpressureDrop()onBackpressureLatest())来指定背压策略
  • 当下游没有指定背压策略时会抛出 MissingBackpressureException 异常

创建操作符

just

  • 最多只能发送 10 个数据,最后发送 onComplete
  • 当发送数据超过 2 个时,内部调用 fromArray()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.just(1, 2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

// 打印结果:
onSubscribe:
onNext: 1
onNext: 2
onComplete:

fromArray

1
fromArray(T... items)
  • 如果参数是个空数组,直接调用 empty() 创建符;如果只有一个元素,则调用 just()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
    return empty();
    } else
    if (items.length == 1) {
    return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int[] ints = new int[]{1, 2};

Observable.fromArray(1, 2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Integer integer) {
// 如果传入的是数组,此处的参数也是数组
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

// 打印结果:
onSubscribe:
onNext: 1
onNext: 2
onComplete:

empty

  • 只发送 onComplete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Object o) {
Log.d(TAG, "onNext: " + o);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

// 打印结果:
onSubscribe:
onComplete:

fromIterable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);

Observable.fromIterable(list).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

// 输出结果:
onSubscribe:
onNext: 1, Thread[main,5,main]
onNext: 2, Thread[main,5,main]
onComplete:

timer

1
2
timer(long delay, TimeUnit unit)
timer(long delay, TimeUnit unit, Scheduler scheduler)
  • 延迟 delay 发送一个 0 和 onComplete
  • 默认在子线程发送事件,可指定发送事件所在的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

// 输入结果:
onSubscribe: Thread[main,5,main]
onNext: 0, Thread[RxComputationThreadPool-1,5,main]
onComplete: Thread[RxComputationThreadPool-1,5,main]

interval

1
2
3
4
interval(long period, TimeUnit unit)    // 每隔 period 发送一次 onNext() 事件
interval(long period, TimeUnit unit, Scheduler scheduler) // 每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程
interval(long initialDelay, long period, TimeUnit unit) // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程
  • 默认在子线程发送事件,可通过参数指定发送事件所在的线程
  • 从 0 开始无限制的发送 onNext 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + Thread.currentThread());
}

@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: " + Thread.currentThread());
}
});

intervalRange

1
2
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  • 一开始延迟 initialDelay,然后从 start 开始每隔 period 发送一次 onNext,一共发送 count 个事件,可指定发送事件所在的线程
  • 默认是在子线程发送事件,发送了 count 个 onNext 后会发送 onComplete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.intervalRange(3, 3, 0, 1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + Thread.currentThread());
}

@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: " + Thread.currentThread());
}
});

// 输出结果:
onSubscribe: Thread[main,5,main]
onNext: 3, Thread[RxComputationThreadPool-1,5,main]
onNext: 4, Thread[RxComputationThreadPool-1,5,main]
onNext: 5, Thread[RxComputationThreadPool-1,5,main]
onComplete: Thread[RxComputationThreadPool-1,5,main]

range、rangeLong

1
2
range(final int start, final int count)
rangeLong(long start, long count)
  • 从 start 开始发送 count 个 onNext,最后发送 onComplete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.range(1, 3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + Thread.currentThread());
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer + ", " + Thread.currentThread());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: " + Thread.currentThread());
}
});

// 输出结果:
onSubscribe: Thread[main,5,main]
onNext: 1, Thread[main,5,main]
onNext: 2, Thread[main,5,main]
onNext: 3, Thread[main,5,main]
onComplete: Thread[main,5,main]

zip

  • zip:将多个 Observable 发送的事件组合起来,然后再发送这个新的事件
  • 严格按照发送事件的顺序来组合新的事件
  • 下游收到的事件数量和上游发送最少的事件相同,即 observable1 发送 1 个事件,observable2 发送 2 个事件,下游会收到 1 个事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // observable1
    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(i);
    }
    }).subscribeOn(Schedulers.io());

    // observable2
    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    emitter.onNext("A");
    }
    }).subscribeOn(Schedulers.io());

    // zip
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(Integer integer, String s) throws Exception {
    return integer + s;
    }
    }).observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    Log.d(TAG, s);
    }
    }, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    Log.w(TAG, throwable);
    }
    });

sample

  • sample:每隔一定的时间从上游取一个事件发送给下游
    1
    2
    // 每隔 2 秒从上游取出一个事件发送给下游
    Observable.create(...).sample(2, TimeUnit.SECONDS)

filter

  • filter:过滤事件,符合条件的才发送到下游
    1
    2
    3
    4
    5
    6
    7
    Observable.create(...).filter(new Predicate<Object>() {
    @Override
    public boolean test(Object o) throws Exception {
    // 返回 true 才继续往下走
    return ...;
    }
    })

take

1
2
3
take(long count)    // 发送 count 个事件给下游
take(long time, TimeUnit unit) // 发送多久
take(long time, TimeUnit unit, Scheduler scheduler) // 指定线程发送多久
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 发送 3 个事件
Flowable.interval(1, TimeUnit.SECONDS)
.take(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});

// 发送 300 毫秒
Flowable.interval(1, TimeUnit.SECONDS)
.take(3000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
坚持原创技术分享,您的支持将鼓励我继续创作!