Szukaj…


Wprowadzenie

Ciśnienie wsteczne występuje, gdy w Observable potoku przetwarzania niektóre etapy asynchroniczne nie mogą przetworzyć wartości wystarczająco szybko i potrzebują sposobu, aby poinformować producenta o zwolnieniu.

Klasyczny przypadek potrzeby przeciwciśnienia występuje, gdy producent jest gorącym źródłem:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

W tym przykładzie główny wątek wyprodukuje 1 milion produktów dla konsumenta końcowego, który przetwarza go w wątku tła. Prawdopodobnie metoda compute(int) zajmuje trochę czasu, ale obciążenie łańcucha Observable operatorów może również wydłużyć czas potrzebny na przetworzenie elementów. Jednak produkujący wątek z pętlą for nie może o tym wiedzieć i kontynuuje onNext .

Wewnętrznie operatory asynchroniczne mają bufory do przechowywania takich elementów, dopóki nie zostaną przetworzone. W klasycznej wersji Rx.NET i wczesnej wersji RxJava bufory te były nieograniczone, co oznacza, że prawdopodobnie pomieści prawie wszystkie miliony elementów z przykładu. Problem zaczyna się, gdy na przykład jest 1 miliard elementów lub ta sama 1 milionowa sekwencja pojawia się 1000 razy w programie, co prowadzi do OutOfMemoryError i ogólnie spowalnia z powodu nadmiernego obciążenia GC.

Podobnie do tego, jak obsługa błędów stała się obywatelem pierwszej klasy i otrzymywała operatorów, aby sobie z tym poradzić (za onErrorXXX operatorów onErrorXXX ), przeciwciśnienie jest kolejną właściwością przepływów danych, o których programista musi myśleć i obsługiwać (za onBackpressureXXX operatorów onBackpressureXXX ).

Poza powyższym obiektem PublishSubject istnieją inne operatory, które nie obsługują ciśnienia wstecznego, głównie z powodów funkcjonalnych. Na przykład interval operatora okresowo emituje wartości, przy czym ciśnienie wsteczne doprowadziłoby do przesunięcia okresu w stosunku do zegara ściennego.

We współczesnym RxJava większość operatorów asynchronicznych ma teraz ograniczony bufor wewnętrzny, jak na przykład observeOn powyżej, a każda próba przepełnienia tego bufora spowoduje zakończenie całej sekwencji za pomocą MissingBackpressureException . Dokumentacja każdego operatora zawiera opis jego zachowania pod ciśnieniem wstecznym.

Jednak ciśnienie wsteczne występuje bardziej subtelnie w regularnych sekwencjach zimna (które nie dają i nie powinny dawać MissingBackpressureException ). Jeśli pierwszy przykład zostanie przepisany:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Nie ma błędu i wszystko działa płynnie przy małym zużyciu pamięci. Powodem tego jest to, że wielu operatorów źródłowych może „generować” wartości na żądanie, a zatem operator observeOn może powiedzieć, że range generuje co najwyżej tyle wartości, observeOn bufor może observeOn jednocześnie bez przepełnienia.

Negocjacje opierają się na informatycznej koncepcji wspólnych procedur (dzwonię do ciebie, dzwonisz do mnie). Operator range wysyła zwrotnego, w postaci wykonania Producer interfejs do observeOn przez wywołanie wewnętrzną ( Subscriber 'S) setProducer . W zamian, observeOn wywołuje Producer.request(n) z wartością observeOn range którym wolno produkować (tj. onNext it) tyle dodatkowych elementów. Następnie obowiązkiem observeOn jest wywołanie metody request we właściwym czasie i z odpowiednią wartością, aby utrzymać przepływ danych, ale nie przepełnienie.

Wyrażanie presji wstecznej u odbiorców końcowych jest rzadko konieczne (ponieważ są one synchroniczne w odniesieniu do ich bezpośredniego działania wyjściowego, a presja zwrotna naturalnie dzieje się z powodu blokowania stosu wywołań), ale może być łatwiej zrozumieć jej działanie:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

W tym przypadku implementacja onStart wskazuje range do wygenerowania pierwszej wartości, która jest następnie odbierana w onNext . Po zakończeniu compute(int) następna wartość jest następnie żądana z range . W naiwnej implementacji range takie wywołanie rekurencyjnie wywoływałoby onNext , co prowadziłoby do StackOverflowError co oczywiście jest niepożądane.

Aby temu zapobiec, operatorzy stosują tak zwaną logikę trampolinowania, która zapobiega takim ponownym połączeniom. W kategoriach range pamięta się, że podczas wywołania onNext() onNext() request(1) a gdy onNext() powróci, wykona kolejną rundę i onNext() z następną liczbą całkowitą. Dlatego też, jeśli oba są zamienione, przykład nadal działa tak samo:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Nie dotyczy to jednak onStart . Chociaż Observable infrastruktura gwarantuje, że zostanie wywołana co najwyżej raz dla każdego Subscriber , wezwanie do request(1) może natychmiast uruchomić emisję elementu. Jeśli po wywołaniu request(1) potrzebna jest logika inicjalizacji, która jest potrzebna przez onNext , mogą wystąpić wyjątki:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

W tym przypadku synchronicznym NullPointerException zostanie NullPointerException natychmiast podczas wykonywania programu onStart . Bardziej subtelny błąd występuje, gdy wywołanie request(1) wyzwala asynchroniczne wywołanie onNext w innym wątku i odczytywanie name w wyścigach onNext zapisujących je w request postu onStart .

Dlatego należy wykonać całą inicjalizację pola w onStart lub nawet wcześniej, a ostatnia onStart call request() . Implementacje request() w operatorach zapewniają odpowiednią relację przed zdarzeniem (lub innymi słowy, zwolnienie pamięci lub pełne ogrodzenie), gdy jest to konieczne.

Operatory onBackpressureXXX

Większość programistów napotyka na MissingBackpressureException gdy ich aplikacja kończy się niepowodzeniem z MissingBackpressureException a wyjątek zwykle wskazuje na operator observeOn . Rzeczywistą przyczyną jest zwykle użycie PublishSubject , timer() lub interval() lub niestandardowych operatorów utworzonych za pomocą PublishSubject create() .

Istnieje kilka sposobów radzenia sobie z takimi sytuacjami.

Zwiększanie rozmiarów buforów

Czasami takie przepełnienia zdarzają się z powodu źródeł pękania. Nagle użytkownik zbyt szybko stuka ekran i observeOn , że domyślny 16-elementowy bufor wewnętrzny w systemie Android przepełnia się.

Większość operatorów wrażliwych na ciśnienie wsteczne w najnowszych wersjach RxJava pozwala teraz programistom określać rozmiar swoich wewnętrznych buforów. Odpowiednie parametry są zwykle nazywane bufferSize , prefetch lub capacityHint . Biorąc pod uwagę przepełniony przykład we wstępie, możemy po prostu zwiększyć rozmiar bufora parametru observeOn aby mieć wystarczająco dużo miejsca na wszystkie wartości.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Należy jednak pamiętać, że generalnie może to być tylko tymczasowa poprawka, ponieważ przepełnienie może się nadal zdarzyć, jeśli źródło nadprodukuje prognozowany rozmiar bufora. W takim przypadku można użyć jednego z następujących operatorów.

Wartości wsadowe / pomijane ze standardowymi operatorami

W przypadku, gdy dane źródłowe mogą być przetwarzane bardziej wydajnie wsadowo, można zmniejszyć prawdopodobieństwo MissingBackpressureException , używając jednego ze standardowych operatorów przetwarzania wsadowego (według wielkości i / lub czasu).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Jeśli niektóre wartości można bezpiecznie zignorować, można użyć próbkowania (z czasem lub innego Observable) i operatorów dławienia ( throttleFirst , throttleLast , throttleWithTimeout ).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Należy jednak pamiętać, że operatorzy ci zmniejszają jedynie szybkość odbioru wartości przez dalszy rynek, a zatem mogą nadal prowadzić do MissingBackpressureException .

onBackpressureBuffer ()

Ten operator w swojej nieparametrycznej formie ponownie wprowadza nieograniczony bufor między źródłem źródłowym a operatorem dolnym. Brak ograniczeń oznacza, że tak długo, jak JVM nie zabraknie pamięci, może obsłużyć prawie każdą ilość pochodzącą ze źródła pękania.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

W tym przykładzie, observeOn ma bardzo mały rozmiar bufora, ale nie ma MissingBackpressureException ponieważ onBackpressureBuffer pochłania wszystkie miliony wartości i przekazuje małe partie, aby observeOn .

Należy jednak pamiętać, że onBackpressureBuffer zużywa swoje źródło w nieograniczony sposób, to znaczy bez stosowania do niego żadnego ciśnienia wstecznego. Powoduje to, że nawet źródło wspomagające przeciwciśnienie, takie jak range zostanie całkowicie zrealizowane.

Istnieją 4 dodatkowe przeciążenia onBackpressureBuffer

onBackpressureBuffer (int pojemność)

Jest to ograniczona wersja, która sygnalizuje BufferOverflowError w przypadku, gdy bufor osiągnie określoną pojemność.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Znaczenie tego operatora maleje, ponieważ coraz więcej operatorów pozwala teraz ustawiać swoje rozmiary buforów. Co do reszty, daje to możliwość „rozszerzenia wewnętrznego bufora” poprzez posiadanie większej liczby opcji onBackpressureBuffer niż domyślnych.

onBackpressureBuffer (pojemność int, Action0 onOverflow)

To przeciążenie wywołuje akcję (współdzieloną) na wypadek przepełnienia. Jego przydatność jest raczej ograniczona, ponieważ nie ma innych informacji o przepełnieniu niż bieżący stos wywołań.

onBackpressureBuffer (int pojemność, Action0 onOverflow, BackpressureOverflow.Strategy strategia)

To przeciążenie jest w rzeczywistości bardziej przydatne, ponieważ pozwala zdefiniować, co należy zrobić, jeśli pojemność zostanie osiągnięta. BackpressureOverflow.Strategy to interfejs, ale klasa BackpressureOverflow oferuje 4 pola statyczne z implementacjami reprezentującymi typowe działania:

  • ON_OVERFLOW_ERROR : jest to domyślne zachowanie dwóch poprzednich przeciążeń, sygnalizujące BufferOverflowException
  • ON_OVERFLOW_DEFAULT : obecnie jest taki sam jak ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : jeśli nastąpi przepełnienie, bieżąca wartość zostanie po prostu zignorowana, a tylko stare wartości zostaną dostarczone, gdy żądania zstępują.
  • ON_OVERFLOW_DROP_OLDEST : upuszcza najstarszy element w buforze i dodaje do niego bieżącą wartość.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Pamiętaj, że dwie ostatnie strategie powodują nieciągłość w strumieniu, ponieważ usuwają elementy. Ponadto nie będą sygnalizować BufferOverflowException .

onBackpressureDrop ()

Ilekroć dalszy użytkownik nie jest gotowy na otrzymywanie wartości, operator ten upuści ten element z sekwencji. Można myśleć o tym jako o pojemności 0 na onBackpressureBuffer ze strategią ON_OVERFLOW_DROP_LATEST .

Ten operator jest przydatny, gdy można bezpiecznie zignorować wartości ze źródła (takie jak ruchy myszy lub bieżące sygnały lokalizacji GPS), ponieważ później będzie więcej aktualnych wartości.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Może to być przydatne w połączeniu z interval() operatora źródła interval() . Na przykład, jeśli chcesz wykonać jakieś okresowe zadanie w tle, ale każda iteracja może trwać dłużej niż ten okres, możesz bezpiecznie usunąć powiadomienie o przekroczeniu interwału, ponieważ później będzie więcej:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Istnieje jedno przeciążenie tego operatora: onBackpressureDrop(Action1<? super T> onDrop) którym wywoływana jest (wspólna) akcja z upuszczaną wartością. Ten wariant pozwala samodzielnie wyczyścić wartości (np. Uwolnić powiązane zasoby).

onBackpressureLatest ()

Ostateczny operator zachowuje tylko najnowszą wartość i praktycznie zastępuje starsze, niedostarczone wartości. Można myśleć o tym jako o wariancie onBackpressureBuffer o pojemności 1 i strategii ON_OVERFLOW_DROP_OLDEST .

W przeciwieństwie do onBackpressureDrop zawsze istnieje wartość dostępna do konsumpcji, jeśli onBackpressureDrop że onBackpressureDrop . Może to być przydatne w niektórych sytuacjach podobnych do telemetrii, w których dane mogą mieć pewien wzorzec, ale tylko najnowsze są interesujące do przetwarzania.

Na przykład, jeśli użytkownik dużo kliknie na ekranie, nadal chcielibyśmy zareagować na jego najnowsze dane wejściowe.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

Zastosowanie onBackpressureDrop w tym przypadku doprowadziłoby do sytuacji, w której ostatnie kliknięcie zostanie upuszczone i pozostawia użytkownika zastanawiającego się, dlaczego logika biznesowa nie została wykonana.

Tworzenie źródeł danych pod ciśnieniem

Tworzenie źródeł danych z przeciwciśnieniem jest stosunkowo łatwiejszym zadaniem w przypadku ogólnego przeciwciśnienia, ponieważ biblioteka już oferuje statyczne metody w Observable które obsługują przeciwciśnienie dla programisty. Możemy wyróżnić dwa rodzaje metod fabrycznych: zimne „generatory”, które albo zwracają i generują elementy na podstawie popytu na niższym poziomie, i gorące „popychacze”, które zwykle łączą niereaktywne i / lub nie poddające się ciśnieniu źródła danych i nakładają pewne funkcje przeciwciśnienia na im.

właśnie

Najbardziej podstawowym źródłem ciśnienia wstecznego świadomy jest tworzony poprzez just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Ponieważ wyraźnie nie onStart w onStart , nic nie wydrukuje. just świetnie, gdy istnieje stała wartość, którą chcielibyśmy szybko uruchomić sekwencję.

Niestety, just często mylony jest ze sposobem dynamicznego obliczania czegoś, co zostanie wykorzystane przez Subscriber :

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Zaskakujące dla niektórych jest to, że drukuje 1 dwukrotnie zamiast odpowiednio 1 i 2. Jeśli połączenie zostanie przepisane, staje się oczywiste, dlaczego tak działa:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue jest wywoływana jako część głównej procedury, a nie w odpowiedzi na subskrybentów subskrybujących.

fromCallable

To, czego ludzie tak naprawdę potrzebują, to metoda fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Tutaj computeValue jest wykonywane tylko wtedy, gdy subskrybent subskrybuje i dla każdego z nich, drukuje oczekiwane 1 i 2. Oczywiście, fromCallable również poprawnie obsługuje przeciwciśnienie i nie wyemituje obliczonej wartości, chyba że zostanie o to poproszony. Zauważ jednak, że obliczenia i tak się zdarzają. W przypadku, gdy samo obliczenie powinno zostać opóźnione do momentu, w którym żądane oprogramowanie faktycznie zażąda, możemy użyć just z map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just nie wyemituje swojej stałej wartości, dopóki nie zostanie o to poproszony, gdy jest odwzorowany na wynik computeValue , wciąż wywoływany dla każdego subskrybenta indywidualnie.

od

Jeśli dane są już dostępne jako tablica obiektów, lista obiektów lub dowolne źródło Iterable , odpowiednie from przeciążeń obsłuży przeciwciśnienie i emisję takich źródeł:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Dla wygody (i uniknięcia tworzenia macierzy ostrzeżenia o ogólne) jest od 2 do 10 przeciążenia argumentem just że wewnątrz przekazać from .

Od from(Iterable) daje również interesującą okazję. Wiele generowania wartości można wyrazić w postaci maszyny stanów. Każdy żądany element wyzwala przejście stanu i obliczenie zwróconej wartości.

Pisanie takich maszyn stanowych jak Iterable s jest nieco skomplikowane (ale nadal łatwiejsze niż pisanie Observable do ich konsumpcji) iw przeciwieństwie do C #, Java nie ma żadnego wsparcia z kompilatora do budowania takich maszyn stanowych po prostu pisząc klasycznie wyglądający kod (z yield return i yield break ). Niektóre biblioteki oferują pomoc, na przykład AbstractIterable Google Guava oraz Ix.generate() i Ix.forloop() i Ix.forloop() . Są one same w sobie godne pełnej serii, więc zobaczmy bardzo podstawowe źródło Iterable które bez końca powtarza stałą wartość:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Gdybyśmy zużyli iterator za pomocą klasycznej pętli for, spowodowałoby to nieskończoną pętlę. Ponieważ budujemy z niego Observable , możemy wyrazić naszą wolę zużywania tylko pierwszych 5 z nich, a następnie przestać o nic prosić. To jest prawdziwa moc leniwej oceny i obliczeń w Observable .

Utwórz (SyncOnSubscribe)

Czasami źródło danych, które należy przekonwertować na sam świat reaktywny, jest synchroniczne (blokujące) i podobne do ściągania, co oznacza, że musimy wywołać metodę get lub read , aby uzyskać następny kawałek danych. Można oczywiście przekształcić to w Iterable ale gdy takie źródła są powiązane z zasobami, możemy wyciec te zasoby, jeśli dalszy kanał Iterable sekwencję przed jej zakończeniem.

Aby obsłużyć takie przypadki, RxJava ma klasę SyncOnSubscribe . Można go rozszerzyć i wdrożyć jego metody lub użyć jednej z metod fabrycznych opartych na lambda do zbudowania instancji.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

Ogólnie SyncOnSubscribe używa 3 wywołań zwrotnych.

Pierwsze wywołania zwrotne umożliwiają utworzenie stanu dla subskrybenta, takiego jak FileInputStream w przykładzie; plik zostanie otwarty niezależnie dla każdego subskrybenta.

Drugie wywołanie zwrotne przyjmuje ten obiekt stanu i udostępnia wyjściowy Observer którego metody onXXX można wywoływać w celu emitowania wartości. To wywołanie zwrotne jest wykonywane tyle razy, ile zażądano dalszego użytkownika. Przy każdym wywołaniu musi co najwyżej raz wywołać onNext a następnie onError lub onCompleted . W tym przykładzie wywołujemy funkcję onCompleted() jeśli odczytany bajt jest ujemny, co oznacza koniec pliku i wywołuje onError w przypadku, gdy odczyt zgłosi onError IOException .

Ostateczne wywołanie zwrotne jest wywoływane, gdy dalszy kanał wypisuje się (zamykając strumień wejściowy) lub gdy poprzednie wywołanie zwrotne wywoływało metody terminalowe; pozwala uwolnić zasoby. Ponieważ nie wszystkie źródła potrzebują wszystkich tych funkcji, statyczne metody SyncOnSubscribe pozwalają stworzyć instancje bez nich.

Niestety wiele wywołań metod w JVM i innych bibliotekach generuje sprawdzone wyjątki i należy je zapakować w try-catch es, ponieważ interfejsy funkcjonalne używane przez tę klasę nie pozwalają na zgłaszanie sprawdzonych wyjątków.

Oczywiście możemy naśladować inne typowe źródła, takie jak nieograniczony zakres:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

W tym ustawieniu current zaczyna się od 0 a przy następnym wywołaniu lambda parametr current ma teraz wartość 1 .

Istnieje wariant SyncOnSubscribe nazwie AsyncOnSubscribe który wygląda dość podobnie, z tym wyjątkiem, że środkowe wywołanie zwrotne również przyjmuje długą wartość, która reprezentuje kwotę żądania z późniejszego AsyncOnSubscribe , a wywołanie zwrotne powinno generować wartość Observable o dokładnie tej samej długości. To źródło następnie łączy wszystkie te Observable w jedną sekwencję.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Trwa (gorąca) dyskusja na temat przydatności tej klasy i generalnie nie jest zalecana, ponieważ rutynowo łamie oczekiwania dotyczące tego, jak faktycznie wyemituje wygenerowane wartości i jak zareaguje, a nawet jakiego rodzaju wartości żądań otrzyma w bardziej złożone scenariusze konsumenckie.

Utwórz (emiter)

Czasami źródło, które ma być opakowane w Observable jest już gorące (takie jak ruchy myszy) lub zimne, ale nie ma nadciśnienia w jego interfejsie API (takim jak asynchroniczne wywołanie zwrotne w sieci).

Aby poradzić sobie z takimi przypadkami, najnowsza wersja RxJava wprowadziła metodę fabryczną create(emitter) . Wymaga dwóch parametrów:

  • wywołanie zwrotne, które zostanie wywołane z instancją interfejsu Emitter<T> dla każdego przychodzącego subskrybenta,
  • wyliczenie Emitter.BackpressureMode które upoważnia programistę do określenia zachowania przeciwciśnienia, które należy zastosować. Ma zwykłe tryby, podobne do onBackpressureXXX oprócz sygnalizowania MissingBackpressureException lub po prostu ignorowania takiego przepełnienia.

Zauważ, że obecnie nie obsługuje dodatkowych parametrów dla tych trybów przeciwciśnienia. Jeśli ktoś potrzebuje tych dostosowań, należy użyć opcji NONE jako trybu przeciwciśnienia i zastosować odpowiedni parametr onBackpressureXXX na wynikowym Observable .

Pierwszy typowy przypadek jego użycia, gdy chce się wchodzić w interakcje ze źródłem opartym na wypychaniu, takim jak zdarzenia GUI. API te posiadają pewne formy addListener / removeListener nazywa, że można wykorzystać:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter jest stosunkowo prosty w użyciu; można na nim wywołać onNext , onError i onCompleted , a operator samodzielnie zarządza ciśnieniem wstecznym i zarządzaniem rezygnacją z subskrypcji. Ponadto, jeśli opakowany interfejs API obsługuje anulowanie (takie jak usunięcie detektora w przykładzie), można użyć setCancellation (lub setSubscription dla zasobów podobnych do Subscription ), aby zarejestrować wywołanie zwrotne anulowania, które zostanie wywołane, gdy dalszy onError lub onError / onCompleted jest wywoływana w podanej instancji Emitter .

Te metody pozwalają na skojarzenie tylko jednego zasobu z emiterem na raz, a ustawienie nowego automatycznie anuluje subskrypcję starego. Jeśli trzeba obsłużyć wiele zasobów, utwórz CompositeSubscription , powiąż ją z emiterem, a następnie dodaj kolejne zasoby do samego CompositeSubscription :

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Drugi scenariusz zazwyczaj wymaga asynchronicznego interfejsu API opartego na wywołaniu zwrotnym, który należy przekonwertować na Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

W takim przypadku delegacja działa w ten sam sposób. Niestety, te klasyczne interfejsy API w stylu wywołania zwrotnego nie obsługują anulowania, ale jeśli tak, można skonfigurować anulowanie tak, jak w poprzednich przykładach (być może z bardziej zaangażowanym sposobem). Uwaga korzystanie z LATEST trybie zwrotnym; jeśli wiemy, że będzie tylko jedna wartość, nie potrzebujemy strategii BUFFER , ponieważ alokuje ona domyślny bufor o długości 128 elementów (który rośnie w miarę potrzeby), który nigdy nie zostanie w pełni wykorzystany.



Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow