Ricerca…


Esempi di base

Gli scheduler sono un'astrazione RxJava sull'unità di elaborazione. Un programma di pianificazione può essere supportato da un servizio Executor, ma è possibile implementare la propria implementazione dello scheduler.

Uno Scheduler dovrebbe soddisfare questo requisito:

  • Dovrebbe elaborare in sequenza l'attività non ritardata (ordine FIFO)
  • L'attività può essere ritardata

Un Scheduler può essere utilizzato come parametro in alcuni operatori (esempio: delay ) o utilizzato con il metodo subscribeOn / observeOn .

Con qualche operatore, lo Scheduler verrà utilizzato per elaborare l'attività dell'operatore specifico. Ad esempio, il delay programmerà un'attività ritardata che emetterà il valore successivo. Questo è uno Scheduler che lo manterrà ed eseguirà in seguito.

Il subscribeOn può essere utilizzato una volta per Observable . Definirà in quale Scheduler il codice dell'abbonamento sarà eseguito.

observeOn può essere utilizzato più volte per Observable . observeOn in quale Scheduler verrà utilizzato per eseguire tutte le attività definite dopo il metodo observeOn . observeOn ti aiuterà a eseguire il thread hopping.

iscriviti a uno specifico programmatore

// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println); 

observOn con uno Scheduler specifico

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // next tasks will be executed in the computation scheduler
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));   

Specifica di uno Schedulatore specifico con un operatore

Alcuni operatori possono prendere uno Scheduler come parametro.

Observable.just(1)
          // the onNext method of the delay operator will be executed in a new thread
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

Pubblica nell'iscritto:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Pool di thread:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Web Socket Observable:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow