rx-java
Zauważalny
Szukaj…
Utwórz obserwowalny
Istnieje kilka sposobów na stworzenie Observable w RxJava. Najpotężniejszym sposobem jest użycie metody Observable.create . Ale to także najbardziej skomplikowany sposób . Musisz więc unikać korzystania z niego w jak największym stopniu.
Emitowanie wartości wyjściowej
Jeśli masz już wartość, możesz użyć Observable.just aby wyemitować swoją wartość.
Observable.just("Hello World").subscribe(System.out::println);
Emisja wartości, którą należy obliczyć
Jeśli chcesz wyemitować wartość, która nie jest jeszcze obliczona lub jej obliczenie może zająć dużo czasu, możesz użyć Observable.fromCallable aby wyemitować następną wartość.
Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);
longComputation() zostanie wywołana tylko wtedy, gdy zasubskrybujesz longComputation() Observable . W ten sposób obliczenia będą leniwe .
Alternatywny sposób emitowania wartości, którą należy obliczyć
Observable.defer buduje Observable podobnie jak Observable.fromCallable ale jest używany, gdy trzeba zwrócić Observable zamiast wartości. Jest to przydatne, gdy chcesz zarządzać błędami w swoim połączeniu.
Observable.defer(() -> {
try {
return Observable.just(longComputation());
} catch(SpecificException e) {
return Observable.error(e);
}).subscribe(System.out::println);
Obserwowalne na zimno i na gorąco
Obserwowalne są ogólnie klasyfikowane jako Hot lub Cold , w zależności od ich zachowania emisyjnego.
Cold Observable to taki, który zaczyna emitować na żądanie (subskrypcja), podczas gdy Hot Observable to taki, który emituje niezależnie od subskrypcji.
Zimno obserwowalne
/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2
Wyjście powyższego kodu wygląda (może się różnić):
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0 -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2
Zauważ, że chociaż sub2 zaczyna się późno, otrzymuje wartości od samego początku. Podsumowując, Cold Observable emituje przedmioty tylko na żądanie. Wiele żądań uruchamia wiele potoków.
Obserwowalne na gorąco
Uwaga: Obserwowalne na gorąco emitują wartości niezależne od poszczególnych subskrypcji. Mają własną oś czasu i zdarzają się zdarzenia, niezależnie od tego, czy ktoś nas słucha, czy nie. Cold Observale można przekształcić w Hot Observable za pomocą prostej publish .
Observable.interval(500, TimeUnit.MILLISECONDS)
.publish(); // publish converts cold to hot
publish zwraca ConnectableObservable który dodaje funkcje do łączenia i odłączania od obserwowalnego.
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Powyższe daje:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
Zauważ, że chociaż sub2 zaczyna obserwować późno, jest zsynchronizowane z sub1 .
Odłączanie jest trochę bardziej skomplikowane! Rozłączanie ma miejsce w ramach Subscription a nie w przypadku Observable .
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription
System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();
Powyższe powoduje:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting -> reconnect after unsubscribe
sub1, 0
...
Po rozłączeniu Observable zasadniczo „kończy” i uruchamia się ponownie po dodaniu nowej subskrypcji.
Hot Observable można wykorzystać do utworzenia EventBus . Takie EventBuse są na ogół lekkie i super szybkie. Jedynym minusem RxBus jest to, że wszystkie zdarzenia muszą być ręcznie zaimplementowane i przekazane do magistrali.