rx-java
planners
Zoeken…
Basis voorbeelden
Planners zijn een RxJava-abstractie over verwerkingseenheid. Een planner kan worden ondersteund door een Executor-service, maar u kunt uw eigen plannerimplementatie implementeren.
Een Scheduler moet aan deze eis voldoen:
- Moet opeenvolgende taken achter elkaar verwerken (FIFO-volgorde)
- Taak kan worden uitgesteld
Een Scheduler kan als parameter worden gebruikt in sommige operatoren (bijvoorbeeld: delay ) of worden gebruikt met de methode subscribeOn / observeOn .
Bij sommige operators wordt de Scheduler gebruikt om de taak van de specifieke operator te verwerken. Met delay wordt bijvoorbeeld een uitgestelde taak gepland die de volgende waarde uitzendt. Dit is een Scheduler die deze later bewaart en uitvoert.
De subscribeOn kan één keer per Observable worden gebruikt. Het zal bepalen in welke Scheduler de code van het abonnement zal worden uitgevoerd.
De observeOn kan meerdere keren worden gebruikt per Observable . Het zal bepalen in welke Scheduler alle taken worden uitgevoerd die zijn gedefinieerd na de methode observeOn . observeOn helpt u bij het observeOn .
Abonneren op specifieke planner
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
observeren met specifieke planner
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Een specifieke planner opgeven bij een operator
Sommige operators kunnen een Scheduler als parameter gebruiken.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Publiceren aan abonnee:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Discussie pool:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Web Socket Observable:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();