rx-java
Harmonogramy
Szukaj…
Podstawowe przykłady
Programy planujące są abstrakcją RxJava dotyczącą jednostki przetwarzającej. Harmonogram może być wspierany przez usługę Executora, ale możesz zaimplementować własną implementację harmonogramu.
Program Scheduler powinien spełniać to wymaganie:
- Powinien przetwarzać sekwencyjnie nieopóźnione zadanie (zamówienie FIFO)
- Zadanie może zostać opóźnione
Scheduler może być użyty jako parametr w niektórych operatorach (na przykład: delay ) lub może być użyty z metodą subscribeOn / observeOn .
W przypadku niektórych operatorów program Scheduler będzie używany do przetwarzania zadania określonego operatora. Na przykład delay zaplanuje opóźnione zadanie, które wyemituje następną wartość. Jest to program Scheduler , który zachowa i uruchomi go później.
subscribeOn można użyć raz na Observable . Określi, w którym programie Scheduler zostanie wykonany kod subskrypcji.
observeOn mogą być używane wielokrotnie za Observable . observeOn w którym Scheduler będą używane wszystkie zadania zdefiniowane po metodzie observeOn . observeOn pomoże ci przeskakiwać wątki.
zasubskrybuj określony harmonogram
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
przestrzegaj określonego harmonogramu
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Określanie konkretnego harmonogramu z operatorem
Niektórzy operatorzy mogą przyjąć Scheduler jako parametr.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Publikuj do subskrybenta:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Pula wątków:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Obserwowalne gniazdo sieciowe:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();