rx-java
Programadores
Buscar..
Ejemplos básicos
Los programadores son una abstracción de RxJava sobre la unidad de procesamiento. Un programador puede respaldar un planificador, pero puede implementar su propia implementación de planificador.
Un Scheduler debe cumplir con este requisito:
- Debería procesar la tarea no demorada secuencialmente (orden FIFO)
- La tarea puede retrasarse
Un Scheduler puede usarse como parámetro en algunos operadores (ejemplo: delay ), o puede usarse con el método subscribeOn / observeOn .
Con algún operador, el Scheduler se utilizará para procesar la tarea del operador específico. Por ejemplo, el delay programará una tarea retrasada que emitirá el siguiente valor. Este es un Scheduler que lo retendrá y ejecutará más tarde.
El subscribeOn se puede utilizar una vez por Observable . Se definirá en qué Scheduler se ejecutará el código de la suscripción.
El observeOn se puede utilizar varias veces por Observable . observeOn en qué Scheduler se utilizará para ejecutar todas las tareas definidas después del método de observeOn . observeOn le ayudará a realizar el salto de hilo.
suscribirse en el programador específico
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
observar con programador específico
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
Especificar un programador específico con un operador
Algunos operadores pueden tomar un Scheduler como parámetro.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
Publicar Para Suscriptor:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Grupo de subprocesos:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Web Socket observable:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();