수색…
Observable 만들기
RxJava에서 Observable을 만드는 방법에는 여러 가지가 있습니다. 가장 강력한 방법은 Observable.create 메서드를 사용하는 것입니다. 그러나 또한 가장 복잡한 방법 이기도합니다. 따라서 가능한 한 많이 사용하지 않아야합니다.
기존 가치 창출
이미 값이있는 경우 Observable.just 를 사용하여 값을 방출 할 수 있습니다.
Observable.just("Hello World").subscribe(System.out::println);
계산되어야하는 값을내는 것
이미 계산되지 않았거나 계산 시간이 오래 걸릴 수있는 값을 방출하려는 경우 Observable.fromCallable 을 사용하여 다음 값을 방출 할 수 있습니다.
Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);
longComputation() 은 Observable 가입 할 때만 호출됩니다. 이 방법은 계산이 게을러진다 .
계산되어야하는 값을 방출하는 다른 방법
Observable.defer 는 Observable 처럼 Observable.fromCallable 하지만 값 대신 Observable 을 반환해야하는 경우에 사용됩니다. 통화 중 오류를 관리하려는 경우에 유용합니다.
Observable.defer(() -> {
try {
return Observable.just(longComputation());
} catch(SpecificException e) {
return Observable.error(e);
}).subscribe(System.out::println);
뜨겁고 차가운 Observables
관찰 가능한 광범위로 분류되는 Hot 또는 Cold 그들의 방출의 행동에 따라.
Cold Observable 은 요청 (구독)시 방출되기 시작하는 반면, Hot Observable 은 구독과 관계없이 방출됩니다.
냉연 관측 가능
/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2
위 코드의 결과는 다음과 같이 보일 수 있습니다 (다를 수도 있음).
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0 -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2
sub2 가 늦게 시작하더라도 시작부터 값을받습니다. 결론적으로 Cold Observable 은 요청시에만 아이템을 방출합니다. 다중 요청은 여러 파이프 라인을 시작합니다.
뜨거운 관측 가능
주 : 뜨거운 관측 값은 개별 구독과 관계없이 값을 방출합니다. 그들은 자신의 타임 라인을 가지고 있으며 누군가가 듣고 있든 없든 이벤트가 발생합니다. Cold Observale 은 간단한 publish 로 Hot Observable 로 변환 할 수 있습니다.
Observable.interval(500, TimeUnit.MILLISECONDS)
.publish(); // publish converts cold to hot
publish 는 관측 가능 객체에 연결 하고 연결 해제 할 수있는 기능을 추가하는 ConnectableObservable 을 반환합니다.
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
위의 결과는 다음과 같습니다.
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
sub2 가 늦게 관찰되기 시작하더라도 sub1 과 동기화됩니다.
연결 해제는 좀 더 복잡합니다! 연결 해제는 Observable 아니라 Subscription 에서 발생합니다.
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription
System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();
위의 결과는 다음과 같습니다.
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting -> reconnect after unsubscribe
sub1, 0
...
연결 해제되면 Observable 본질적으로 "종료"되고 새 구독이 추가 될 때 다시 시작됩니다.
Hot Observable 은 EventBus 를 생성하는 데 사용할 수 있습니다. 이러한 EventBus는 일반적으로 가볍고 빠릅니다. RxBus의 유일한 단점은 모든 이벤트가 수동으로 구현되어 버스로 전달되어야한다는 것입니다.