Szukaj…
Składnia
- Temat <T, R> podmiot = AsyncSubject.create (); // Domyślny AsyncSubject
- Temat <T, R> podmiot = BehaviorSubject.create (); // Default BehaviorSubject
- Subject <T, R> subject = PublishSubject.create (); // Domyślny PublishSubject
- Subject <T, R> subject = ReplaySubject.create (); // Domyślny obiekt ReplaySubject
- mySafeSubject = new SerializedSubject (unSafeSubject); // Konwertuj unsafeSubject na safeSubject - ogólnie w przypadku tematów wielowątkowych
Parametry
| Parametry | Detale |
|---|---|
| T. | Rodzaj wejścia |
| R | Typ wyjścia |
Uwagi
Ta dokumentacja zawiera szczegółowe informacje i objaśnienia dotyczące Subject . Więcej informacji i dalsze informacje można znaleźć w oficjalnej dokumentacji .
Przedmioty podstawowe
Subject w RxJava jest klasa, która jest zarówno Observable i Observer . Zasadniczo oznacza to, że może działać jako Observable i przekazywać dane wejściowe subskrybentom oraz jako Observer aby uzyskiwać dane wejściowe od innego obserwowalnego.
Subject<String, String> subject = PublishSubject.create();
subject.subscribe(System.out::print);
subject.onNext("Hello, World!");
Powyższe drukuje „Witaj, świecie!” do konsoli za pomocą Subjects .
Wyjaśnienie
Pierwszy wiersz kodu definiuje nowy
SubjecttypuPublishSubjectSubject<String, String> subject = PublishSubject.create(); | | | | | subject<input, output> name = default publish subjectDrugi wiersz subskrybuje temat, pokazując zachowanie
Observer.subject.subscribe(System.out::print);Umożliwia to
Subjectprzyjmowanie danych wejściowych jak zwykły subskrybentTrzecia linia wywołuje metodę
onNextpodmiotu, pokazującObservablezachowanie.subject.onNext("Hello, World!");Umożliwia to
Subjectprzekazywanie danych wejściowych wszystkim subskrybującym go.
Rodzaje
Subject (w RxJava) może być dowolnego z tych czterech typów:
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
Ponadto, Subject może być typu SerializedSubject . Ten typ zapewnia, że Subject nie naruszy umowy obserwowalnej (która określa, że wszystkie połączenia muszą być serializowane)
Dalsza lektura:
- Używać lub nie używać przedmiotu z bloga Dave'a Sextona
PublishSubject
PublishSubject emituje do Observer tylko tych elementów, które są emitowane przez źródło Observable Po chwili subskrypcji.
Prosty przykład PublishSubject :
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
Wynik:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
W powyższym przykładzie obiekt PublishSubject subskrybuje obiekt Observable który działa jak zegar i emituje elementy (długie) co 500 milisekund. Jak wynika z wydajnością, PublishSubject przechodzi na dolin to dostaje od źródła ( clock ) do swoich abonentów ( sub1 i sub2 ).
Obiekt PublishSubject może zacząć emitować elementy natychmiast po ich utworzeniu, bez żadnego obserwatora, co grozi utratą jednego lub więcej elementów, dopóki obserwator nie będzie mógł opalać się.
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
Wynik:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
Zauważ, że sub1 emituje wartości zaczynające się od 10 . Wprowadzone 5 sekundowe opóźnienie spowodowało utratę przedmiotów. Nie można ich odtworzyć. Zasadniczo sprawia to, że PublishSubject jest Hot Observable .
Należy również zauważyć, że jeśli obserwator subskrybuje obiekt PublishSubject po emisji n elementów, tych n elementów nie można odtworzyć dla tego obserwatora.
Poniżej znajduje się marmurowy schemat PublishSubject
Obiekt PublishSubject emituje elementy do wszystkich subskrybowanych w dowolnym momencie przed onCompleted funkcji onCompleted źródła Observable .
Jeśli źródło Observable zakończy się z błędem, PublishSubject nie wyśle żadnych elementów do kolejnych obserwatorów, ale po prostu przekaże powiadomienie o błędzie ze źródła Observable.
Przypadek użycia
Załóżmy, że chcesz utworzyć aplikację, która będzie monitorować ceny akcji pewnej firmy i przekaże ją wszystkim klientom, którzy o nią poproszą.
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
W powyższym przykładzie użycia PublishSubject działa jak pomost do przekazywania wartości z twojego serwera wszystkim klientom, którzy subskrybują twój watcher .
Dalsza lektura:

