Szukaj…


Podstawowe przykłady

Programy planujące są abstrakcją RxJava dotyczącą jednostki przetwarzającej. Harmonogram może być wspierany przez usługę Executora, ale możesz zaimplementować własną implementację harmonogramu.

Program Scheduler powinien spełniać to wymaganie:

  • Powinien przetwarzać sekwencyjnie nieopóźnione zadanie (zamówienie FIFO)
  • Zadanie może zostać opóźnione

Scheduler może być użyty jako parametr w niektórych operatorach (na przykład: delay ) lub może być użyty z metodą subscribeOn / observeOn .

W przypadku niektórych operatorów program Scheduler będzie używany do przetwarzania zadania określonego operatora. Na przykład delay zaplanuje opóźnione zadanie, które wyemituje następną wartość. Jest to program Scheduler , który zachowa i uruchomi go później.

subscribeOn można użyć raz na Observable . Określi, w którym programie Scheduler zostanie wykonany kod subskrypcji.

observeOn mogą być używane wielokrotnie za Observable . observeOn w którym Scheduler będą używane wszystkie zadania zdefiniowane po metodzie observeOn . observeOn pomoże ci przeskakiwać wątki.

zasubskrybuj określony harmonogram

// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println); 

przestrzegaj określonego harmonogramu

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // next tasks will be executed in the computation scheduler
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));   

Określanie konkretnego harmonogramu z operatorem

Niektórzy operatorzy mogą przyjąć Scheduler jako parametr.

Observable.just(1)
          // the onNext method of the delay operator will be executed in a new thread
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

Publikuj do subskrybenta:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Pula wątków:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Obserwowalne gniazdo sieciowe:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow