Zoeken…
Syntaxis
- Onderwerp <T, R> subject = AsyncSubject.create (); // Standaard AsyncSubject
- Onderwerp <T, R> subject = BehaviourSubject.create (); // StandaardgedragSubject
- Onderwerp <T, R> subject = PublishSubject.create (); // Standaard PublishSubject
- Onderwerp <T, R> subject = ReplaySubject.create (); // Standaard ReplaySubject
- mySafeSubject = nieuw SerializedSubject (unSafeSubject); // Converteer een onveilig onderwerp naar een veilig onderwerp - meestal voor onderwerpen met meerdere threads
parameters
| parameters | Details |
|---|---|
| T | Invoertype |
| R | Uitgangstype |
Opmerkingen
Deze documentatie geeft details en uitleg over het Subject . Raadpleeg de officiële documentatie voor meer informatie en meer informatie.
Basisonderwerpen
Een Subject in RxJava is een klasse die zowel een Observable als een Observer . Dit betekent in feite dat het kan fungeren als een Observable en invoer kan doorgeven aan abonnees en als een Observer om invoer te krijgen van een andere waarneembare.
Subject<String, String> subject = PublishSubject.create();
subject.subscribe(System.out::print);
subject.onNext("Hello, World!");
Bovenstaande afdrukken "Hallo wereld!" console gebruiken met behulp van Subjects .
Uitleg
De eerste regel code definieert een nieuw
Subjectvan het typePublishSubjectSubject<String, String> subject = PublishSubject.create(); | | | | | subject<input, output> name = default publish subjectDe tweede regel onderschrijft het onderwerp en toont het gedrag van de
Observer.subject.subscribe(System.out::print);Dit stelt de
Subjectin staat om invoer te maken zoals een normale abonneeDe derde regel roept de
onNextmethode van het onderwerp op en toont hetObservablegedrag.subject.onNext("Hello, World!");Dit stelt de
Subjectin staat om input te geven aan alle geabonneerden.
Types
Een Subject (in RxJava) kan een van deze vier typen zijn:
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
Een Subject kan ook van het type SerializedSubject . Dit type zorgt ervoor dat de Subject niet in strijd is met het Waarneembare contract (dat aangeeft dat alle oproepen serieel moeten zijn)
Verder lezen:
- Wel of niet gebruiken Onderwerp uit de blog van Dave Sexton
PublishSubject
PublishSubject zendt een Observer alleen de items die worden geëmitteerd door de bron Observable na het tijdstip van het abonnement.
Een eenvoudig voorbeeld van PublishSubject :
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
Output:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
In het bovenstaande voorbeeld wordt een PublishSubject geabonneerd op een Observable die werkt als een klok en elke 500 milli seconden items (lang) uitzendt. Zoals te zien in de uitvoer, geeft het PublishSubject de PublishSubject door die het van de bron ( clock ) krijgt aan zijn abonnees ( sub1 en sub2 ).
Een PublishSubject kan beginnen met het PublishSubject items zodra het is gemaakt, zonder enige waarnemer, waardoor het risico bestaat dat een of meer items verloren gaan totdat een waarnemer zich kan inschrijven.
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
Output:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
Merk op dat sub1 waarden uitzendt vanaf 10 . De ingevoerde vertraging van 5 seconden veroorzaakte een verlies van items. Deze kunnen niet worden gereproduceerd. Dit maakt PublishSubject wezen een Hot Observable .
Merk ook op dat als een waarnemer zich abonneert op het PublishSubject nadat hij n items heeft uitgezonden, deze n items niet voor deze waarnemer kunnen worden gereproduceerd.
Hieronder is het marmeren diagram van PublishSubject
Het PublishSubject zendt items uit naar iedereen die zich heeft geabonneerd, op elk moment voordat de onCompleted van de bron Observable wordt aangeroepen.
Als de bron Observable eindigt met een fout, PublishSubject het PublishSubject geen items uit naar volgende waarnemers, maar geeft de foutmelding van de bron Observable gewoon door.
Gebruik case
Stel dat u een toepassing wilt maken die de aandelenkoersen van een bepaald bedrijf bewaakt en deze doorstuurt naar alle klanten die erom vragen.
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
In het bovenstaande voorbeeld use case fungeert het PublishSubject als een brug om de waarden van uw server door te geven aan alle clients die zich abonneren op uw watcher .
Verder lezen:

