rx-java
Scheduler
Suche…
Grundlegende Beispiele
Scheduler sind eine RxJava-Abstraktion über die Verarbeitungseinheit. Ein Scheduler kann durch einen Executor-Service gesichert werden, Sie können jedoch Ihre eigene Scheduler-Implementierung implementieren.
Ein Scheduler sollte diese Anforderung erfüllen:
- Soll unverzögerte Task sequentiell abarbeiten (FIFO-Reihenfolge)
- Task kann verzögert werden
Ein Scheduler kann in einigen Operatoren als Parameter verwendet werden (Beispiel: delay ) oder mit der subscribeOn / observeOn Methode verwendet werden.
Bei einigen Bedienern wird der Scheduler dazu verwendet, die Aufgabe des jeweiligen Bedieners zu bearbeiten. delay plant beispielsweise eine verzögerte Task, die den nächsten Wert ausgeben wird. Dies ist ein Scheduler , der später beibehalten und ausgeführt wird.
Das subscribeOn kann einmal pro Observable . Es wird festgelegt, in welchem Scheduler der Code des Abonnements ausgeführt wird.
Der observeOn kann pro Observable mehrfach verwendet werden. Hier wird festgelegt, in welchem Scheduler alle nach der observeOn Methode definierten Aufgaben observeOn werden. observeOn hilft Ihnen, Thread-Hopping auszuführen.
subscribeOn bestimmter Scheduler
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
observOn mit einem bestimmten Scheduler
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Festlegen eines bestimmten Schedulers mit einem Operator
Einige Operatoren können einen Scheduler als Parameter verwenden.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Für Abonnenten veröffentlichen:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Thread-Pool:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Web Socket Observable:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();