Szukaj…


Składnia

  • Temat <T, R> podmiot = AsyncSubject.create (); // Domyślny AsyncSubject
  • Temat <T, R> podmiot = BehaviorSubject.create (); // Default BehaviorSubject
  • Subject <T, R> subject = PublishSubject.create (); // Domyślny PublishSubject
  • Subject <T, R> subject = ReplaySubject.create (); // Domyślny obiekt ReplaySubject
  • mySafeSubject = new SerializedSubject (unSafeSubject); // Konwertuj unsafeSubject na safeSubject - ogólnie w przypadku tematów wielowątkowych

Parametry

Parametry Detale
T. Rodzaj wejścia
R Typ wyjścia

Uwagi

Ta dokumentacja zawiera szczegółowe informacje i objaśnienia dotyczące Subject . Więcej informacji i dalsze informacje można znaleźć w oficjalnej dokumentacji .

Przedmioty podstawowe

Subject w RxJava jest klasa, która jest zarówno Observable i Observer . Zasadniczo oznacza to, że może działać jako Observable i przekazywać dane wejściowe subskrybentom oraz jako Observer aby uzyskiwać dane wejściowe od innego obserwowalnego.

Subject<String, String> subject = PublishSubject.create(); 
subject.subscribe(System.out::print);
subject.onNext("Hello, World!"); 

Powyższe drukuje „Witaj, świecie!” do konsoli za pomocą Subjects .

Wyjaśnienie

  1. Pierwszy wiersz kodu definiuje nowy Subject typu PublishSubject

    Subject<String, String> subject = PublishSubject.create();
        |     |       |       |                 |
     subject<input, output>  name   = default publish subject
    
  2. Drugi wiersz subskrybuje temat, pokazując zachowanie Observer .

    subject.subscribe(System.out::print);
    

    Umożliwia to Subject przyjmowanie danych wejściowych jak zwykły subskrybent

  3. Trzecia linia wywołuje metodę onNext podmiotu, pokazując Observable zachowanie.

    subject.onNext("Hello, World!"); 
    

    Umożliwia to Subject przekazywanie danych wejściowych wszystkim subskrybującym go.

Rodzaje

Subject (w RxJava) może być dowolnego z tych czterech typów:

  • AsyncSubject
  • BehaviorSubject
  • PublishSubject
  • ReplaySubject

Ponadto, Subject może być typu SerializedSubject . Ten typ zapewnia, że Subject nie naruszy umowy obserwowalnej (która określa, że wszystkie połączenia muszą być serializowane)

Dalsza lektura:

PublishSubject

PublishSubject emituje do Observer tylko tych elementów, które są emitowane przez źródło Observable Po chwili subskrypcji.

Prosty przykład PublishSubject :

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

Wynik:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

W powyższym przykładzie obiekt PublishSubject subskrybuje obiekt Observable który działa jak zegar i emituje elementy (długie) co 500 milisekund. Jak wynika z wydajnością, PublishSubject przechodzi na dolin to dostaje od źródła ( clock ) do swoich abonentów ( sub1 i sub2 ).

Obiekt PublishSubject może zacząć emitować elementy natychmiast po ich utworzeniu, bez żadnego obserwatora, co grozi utratą jednego lub więcej elementów, dopóki obserwator nie będzie mógł opalać się.

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

Wynik:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

Zauważ, że sub1 emituje wartości zaczynające się od 10 . Wprowadzone 5 sekundowe opóźnienie spowodowało utratę przedmiotów. Nie można ich odtworzyć. Zasadniczo sprawia to, że PublishSubject jest Hot Observable .

Należy również zauważyć, że jeśli obserwator subskrybuje obiekt PublishSubject po emisji n elementów, tych n elementów nie można odtworzyć dla tego obserwatora.

Poniżej znajduje się marmurowy schemat PublishSubject

wprowadź opis zdjęcia tutaj

Obiekt PublishSubject emituje elementy do wszystkich subskrybowanych w dowolnym momencie przed onCompleted funkcji onCompleted źródła Observable .

Jeśli źródło Observable zakończy się z błędem, PublishSubject nie wyśle żadnych elementów do kolejnych obserwatorów, ale po prostu przekaże powiadomienie o błędzie ze źródła Observable.

wprowadź opis zdjęcia tutaj

Przypadek użycia
Załóżmy, że chcesz utworzyć aplikację, która będzie monitorować ceny akcji pewnej firmy i przekaże ją wszystkim klientom, którzy o nią poproszą.

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

W powyższym przykładzie użycia PublishSubject działa jak pomost do przekazywania wartości z twojego serwera wszystkim klientom, którzy subskrybują twój watcher .

Dalsza lektura:

  • PublishSubject javadocs
  • Blog Thomasa Nielda (czytanie zaawansowane)


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow