Sök…


Grundläggande exempel

Schemaläggare är en RxJava-abstraktion om behandlingsenhet. En schemaläggare kan säkerhetskopieras av en Executor-tjänst, men du kan implementera din egen schemaläggningsimplementering.

En Scheduler ska uppfylla detta krav:

  • Bör behandla obestämda uppgifter sekventiellt (FIFO-ordning)
  • Uppgiften kan försenas

En Scheduler kan användas som parameter i vissa operatörer (exempel: delay ) eller användas med metoden subscribeOn / observeOn .

Med någon operatör kommer Scheduler att användas för att bearbeta den specifika operatörens uppgift. delay planerar till exempel en försenad uppgift som avger nästa värde. Detta är en Scheduler som behåller och kör den senare.

subscribeOn kan användas en gång per Observable . Den kommer att definiera i vilken Scheduler koden för prenumerationen ska vara exekverande.

observeOn kan användas flera gånger per Observable . Den kommer att definiera i vilken Scheduler ska användas för att utföra alla uppgifter som definieras efter observeOn . observeOn hjälper dig att utföra trådhoppning.

subscribePå en specifik schemaläggare

// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println); 

observera På med specifik Scheduler

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // next tasks will be executed in the computation scheduler
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));   

Ange en specifik schemaläggare med en operatör

Vissa operatörer kan ta en Scheduler som parameter.

Observable.just(1)
          // the onNext method of the delay operator will be executed in a new thread
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

Publicera till abonnenten:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Trådpool:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Web Socket observerbart:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow