수색…
기본 예제
스케줄러는 처리 장치에 대한 RxJava 추상화입니다. 스케줄러는 Executor 서비스에 의해 백업 될 수 있지만, 자신 만의 스케줄러 구현을 구현할 수 있습니다.
Scheduler 는 다음 요구 사항을 충족해야합니다.
- 지연되지 않은 작업을 순차적으로 처리해야합니다 (FIFO 순서).
- 작업이 지연 될 수 있음
Scheduler 는 일부 연산자 (예 : delay )에서 매개 변수로 사용하거나 subscribeOn / observeOn 메소드와 함께 사용할 수 있습니다.
일부 운영자의 경우 Scheduler 는 특정 운영자의 작업을 처리하는 데 사용됩니다. 예를 들어 delay 은 다음 값을 방출하는 지연된 작업을 예약합니다. 이것은 나중에 유지하고 실행할 Scheduler 입니다.
subscribeOn 은 Observable 당 한 번 사용할 수 있습니다. 구독자의 코드가 어떤 Scheduler 에서 실행 프로그램이 될지를 정의합니다.
observeOn 은 Observable 당 여러 번 사용할 수 있습니다. observeOn 메소드 다음 에 정의 된 모든 태스크를 실행하는 데 Scheduler 가 사용될 것을 정의합니다. observeOn 은 스레드 호핑을 수행하는 데 도움이됩니다.
subscribeOn 특정 스케줄러
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
특정 스케줄러와 함께 observeOn
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
운영자와 함께 특정 스케줄러 지정
일부 운영자는 Scheduler 를 매개 변수로 사용할 수 있습니다.
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
구독자에게 게시 :
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
스레드 풀 :
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
웹 소켓 관찰 가능 :
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();
Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow