Recherche…


Exemples de base

Les ordonnanceurs sont une abstraction de RxJava sur l'unité de traitement. Un planificateur peut être sauvegardé par un service Executor, mais vous pouvez implémenter votre propre implémentation de planificateur.

Un Scheduler doit répondre à cette exigence:

  • Devrait traiter la tâche non retardée séquentiellement (ordre FIFO)
  • La tâche peut être retardée

Un Scheduler peut être utilisé comme paramètre dans certains opérateurs (exemple: delay ) ou utilisé avec la méthode subscribeOn / observeOn .

Avec certains opérateurs, le Scheduler sera utilisé pour traiter la tâche de l'opérateur spécifique. Par exemple, delay planifiera une tâche différée qui émettra la valeur suivante. Ceci est un Scheduler qui le conservera et l'exécutera plus tard.

Le subscribeOn peut être utilisé une fois par Observable . Il définira dans quel Scheduler le code de l'abonnement sera executer.

observeOn peut être utilisé plusieurs fois par Observable . Il définira dans quel Scheduler sera utilisé pour exécuter toutes les tâches définies après la méthode observeOn . observeOn vous aidera à effectuer des sauts de fil.

SubscribeOn Scheduler spécifique

// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println); 

observer sur planificateur spécifique

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // next tasks will be executed in the computation scheduler
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));   

Spécifier un planificateur spécifique avec un opérateur

Certains opérateurs peuvent prendre un Scheduler comme paramètre.

Observable.just(1)
          // the onNext method of the delay operator will be executed in a new thread
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

Publier sur l'abonné:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Pool de threads:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Web Socket Observable:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow