rx-java
Планировщики
Поиск…
Основные примеры
Планировщики - это абстракция RxJava о процессоре. Планировщик может быть защищен службой Executor, но вы можете реализовать свою собственную реализацию планировщика.
Scheduler должен соответствовать этому требованию:
- Должен обрабатывать неустановленную задачу последовательно (порядок FIFO)
- Задание может быть отложено
Scheduler может использоваться как параметр для некоторых операторов (пример: delay ) или используется с методом subscribeOn / observeOn .
С помощью некоторого оператора Scheduler будет использоваться для обработки задачи конкретного оператора. Например, delay будет планировать отложенную задачу, которая испустит следующее значение. Это Scheduler , который сохранит и выполнит его позже.
subscribeOn можно использовать один раз для Observable . Он определит, в каком Scheduler будет выполняться код подписки.
observeOn может быть использовано несколько раз за Observable . Он определит, в каком Scheduler будет использоваться для выполнения всех задач, определенных после метода observeOn . observeOn поможет вам выполнить скачок резьбы.
subscribeOn конкретный планировщик
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
наблюдать со специальным планировщиком
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Указание конкретного Планировщика с оператором
Некоторые операторы могут принимать параметр Scheduler качестве параметра.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Опубликовать подписчику:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Пул потоков:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Наблюдаемый веб-разъем:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();