サーチ…
基本的な例
スケジューラは、処理単位に関するRxJava抽象化です。スケジューラーはExecutorサービスによってバックアップできますが、独自のスケジューラー実装を実装できます。
Schedulerはこの要件を満たす必要があります。
- 遅延のないタスクをシーケンシャルに処理する必要があります(FIFO順序)
- タスクが遅れることがあります
Schedulerは、一部の演算子(例: delay )のパラメータとして使用することも、 subscribeOn / observeOnメソッドとともに使用することもできます。
一部のオペレータでは、 Schedulerが特定のオペレータのタスクを処理するために使用されます。たとえば、 delayは次の値を出力する遅延タスクをスケジュールします。これは後で保持して実行するSchedulerです。
subscribeOnはObservableごとに1回使用できます。これは、サブスクリプションのコードが実行者になるSchedulerを定義します。
observeOnはObservableごとに複数回使用できます。これは、 observeOnメソッドの後に定義されたすべてのタスクの実行にどのSchedulerが使用されるかを定義します。 observeOnはスレッドホッピングを実行するのに役立ちます。
subscribeOn特定スケジューラ
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
特定のスケジューラでobserveOn
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
演算子を使用して特定のスケジューラを指定する
一部の演算子は、 Schedulerをパラメータとして使用できます。
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
購読者に公開する:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
スレッドプール:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
観察可能なWebソケット:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();
Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow