수색…


Observable 만들기

RxJava에서 Observable을 만드는 방법에는 여러 가지가 있습니다. 가장 강력한 방법은 Observable.create 메서드를 사용하는 것입니다. 그러나 또한 가장 복잡한 방법 이기도합니다. 따라서 가능한 한 많이 사용하지 않아야합니다.

기존 가치 창출

이미 값이있는 경우 Observable.just 를 사용하여 값을 방출 할 수 있습니다.

 Observable.just("Hello World").subscribe(System.out::println);

계산되어야하는 값을내는 것

이미 계산되지 않았거나 계산 시간이 오래 걸릴 수있는 값을 방출하려는 경우 Observable.fromCallable 을 사용하여 다음 값을 방출 할 수 있습니다.

Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);

longComputation()Observable 가입 할 때만 호출됩니다. 이 방법은 계산이 게을러진다 .

계산되어야하는 값을 방출하는 다른 방법

Observable.deferObservable 처럼 Observable.fromCallable 하지만 값 대신 Observable 을 반환해야하는 경우에 사용됩니다. 통화 중 오류를 관리하려는 경우에 유용합니다.

Observable.defer(() -> {
          try {
                return Observable.just(longComputation());
          } catch(SpecificException e) {
                return Observable.error(e);
          }).subscribe(System.out::println);

뜨겁고 차가운 Observables

관찰 가능한 광범위로 분류되는 Hot 또는 Cold 그들의 방출의 행동에 따라.
Cold Observable 은 요청 (구독)시 방출되기 시작하는 반면, Hot Observable 은 구독과 관계없이 방출됩니다.

냉연 관측 가능

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

위 코드의 결과는 다음과 같이 보일 수 있습니다 (다를 수도 있음).

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

sub2 가 늦게 시작하더라도 시작부터 값을받습니다. 결론적으로 Cold Observable 은 요청시에만 아이템을 방출합니다. 다중 요청은 여러 파이프 라인을 시작합니다.

뜨거운 관측 가능

주 : 뜨거운 관측 값은 개별 구독과 관계없이 값을 방출합니다. 그들은 자신의 타임 라인을 가지고 있으며 누군가가 듣고 있든 없든 이벤트가 발생합니다.

Cold Observale 은 간단한 publishHot Observable 로 변환 할 수 있습니다.

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish 는 관측 가능 객체에 연결 하고 연결 해제 할 수있는 기능을 추가하는 ConnectableObservable 을 반환합니다.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

위의 결과는 다음과 같습니다.

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

sub2 가 늦게 관찰되기 시작하더라도 sub1 과 동기화됩니다.
연결 해제는 좀 더 복잡합니다! 연결 해제는 Observable 아니라 Subscription 에서 발생합니다.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

위의 결과는 다음과 같습니다.

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

연결 해제되면 Observable 본질적으로 "종료"되고 새 구독이 추가 될 때 다시 시작됩니다.

Hot ObservableEventBus 를 생성하는 데 사용할 수 있습니다. 이러한 EventBus는 일반적으로 가볍고 빠릅니다. RxBus의 유일한 단점은 모든 이벤트가 수동으로 구현되어 버스로 전달되어야한다는 것입니다.



Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow