rx-java
schedulatori
Ricerca…
Esempi di base
Gli scheduler sono un'astrazione RxJava sull'unità di elaborazione. Un programma di pianificazione può essere supportato da un servizio Executor, ma è possibile implementare la propria implementazione dello scheduler.
Uno Scheduler dovrebbe soddisfare questo requisito:
- Dovrebbe elaborare in sequenza l'attività non ritardata (ordine FIFO)
- L'attività può essere ritardata
Un Scheduler può essere utilizzato come parametro in alcuni operatori (esempio: delay ) o utilizzato con il metodo subscribeOn / observeOn .
Con qualche operatore, lo Scheduler verrà utilizzato per elaborare l'attività dell'operatore specifico. Ad esempio, il delay programmerà un'attività ritardata che emetterà il valore successivo. Questo è uno Scheduler che lo manterrà ed eseguirà in seguito.
Il subscribeOn può essere utilizzato una volta per Observable . Definirà in quale Scheduler il codice dell'abbonamento sarà eseguito.
observeOn può essere utilizzato più volte per Observable . observeOn in quale Scheduler verrà utilizzato per eseguire tutte le attività definite dopo il metodo observeOn . observeOn ti aiuterà a eseguire il thread hopping.
iscriviti a uno specifico programmatore
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
observOn con uno Scheduler specifico
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Specifica di uno Schedulatore specifico con un operatore
Alcuni operatori possono prendere uno Scheduler come parametro.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Pubblica nell'iscritto:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Pool di thread:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Web Socket Observable:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();