rx-java
waarneembaar
Zoeken…
Maak een waarneembare
Er zijn verschillende manieren om een Observable te maken in RxJava. De meest krachtige manier is om de methode Observable.create gebruiken. Maar het is ook de meest gecompliceerde manier . Je moet het dus zoveel mogelijk vermijden .
Uitgaande waarde afgeven
Als je al een waarde hebt, kun je Observable.just gebruiken om je waarde uit te zenden.
Observable.just("Hello World").subscribe(System.out::println);
Een waarde uitzenden die moet worden berekend
Als u een waarde wilt uitzenden die nog niet is berekend, of die lang kan duren om te worden berekend, kunt u Observable.fromCallable gebruiken om uw volgende waarde uit te zenden.
Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);
longComputation() wordt alleen opgeroepen wanneer u zich abonneert op uw Observable . Op deze manier zal de berekening lui zijn .
Alternatieve manier om een waarde uit te geven die moet worden berekend
Observable.defer bouwt een Observable net als Observable.fromCallable maar wordt gebruikt wanneer u een Observable moet retourneren in plaats van een waarde. Dit is handig als u de fouten in uw oproep wilt beheren.
Observable.defer(() -> {
try {
return Observable.just(longComputation());
} catch(SpecificException e) {
return Observable.error(e);
}).subscribe(System.out::println);
Warm en koud observables
Observables worden globaal gecategoriseerd als Hot of Cold , afhankelijk van hun emissiegedrag.
Een Cold Observable is er een die begint op verzoek (abonnement), terwijl een Hot Observable er een is die ongeacht abonnementen uitzendt.
Koud waarneembaar
/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2
De uitvoer van de bovenstaande code ziet eruit als (kan variëren):
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0 -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2
Merk op dat hoewel sub2 laat begint, het vanaf het begin waarden ontvangt. Tot slot zendt een Cold Observable alleen items uit als daarom wordt gevraagd. Meerdere aanvragen starten meerdere pijpleidingen.
Heet waarneembaar
Opmerking: Hot observables zenden waarden uit onafhankelijk van individuele abonnementen. Ze hebben hun eigen tijdlijn en gebeurtenissen vinden plaats, of iemand nu luistert of niet. Een Cold Observale kan worden omgezet in een Hot Observable met een eenvoudige publish .
Observable.interval(500, TimeUnit.MILLISECONDS)
.publish(); // publish converts cold to hot
publish rendementen een ConnectableObservable die functionaliteiten toevoegt aan te sluiten en de stekker uit het waarneembare.
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Bovenstaande levert op:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
Merk op dat hoewel sub2 laat begint te observeren, het synchroon loopt met sub1 .
Verbinding verbreken is iets ingewikkelder! Verbinding verbreken gebeurt op het Subscription en niet op het Observable .
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription
System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();
Het bovenstaande levert op:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting -> reconnect after unsubscribe
sub1, 0
...
Wanneer de verbinding wordt verbroken, wordt de Observable wezen "beëindigd" en wordt deze opnieuw gestart wanneer een nieuw abonnement wordt toegevoegd.
Hot Observable kan worden gebruikt voor het maken van een EventBus . Dergelijke EventBuses zijn over het algemeen licht en supersnel. Het enige nadeel van een RxBus is dat alle gebeurtenissen handmatig moeten worden geïmplementeerd en aan de bus moeten worden doorgegeven.