Suche…


Einführung

Gegendruck ist , wenn in einer Observable Verarbeitungspipeline, einige asynchronen Phasen nicht die Werte schnell genug verarbeiten können und brauchen einen Weg , um die Upstream - Hersteller zu sagen zu verlangsamen.

Der klassische Fall der Notwendigkeit eines Rückstaus ist, wenn der Hersteller eine heiße Quelle ist:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

In diesem Beispiel erzeugt der Haupt-Thread 1 Million Artikel für einen Endverbraucher, der ihn in einem Hintergrund-Thread verarbeitet. Es ist wahrscheinlich, dass die Methode compute(int) einige Zeit in compute(int) nimmt, der Overhead der Operatorkette Observable kann jedoch auch die Zeit für die Verarbeitung von Elementen erhöhen. Der produzierende Thread mit der for-Schleife kann dies jedoch nicht wissen und bleibt auf der onNext .

Intern verfügen asynchrone Operatoren über Puffer, um solche Elemente zu speichern, bis sie verarbeitet werden können. Im klassischen Rx.NET und frühen RxJava waren diese Puffer unbegrenzt, was bedeutet, dass sie wahrscheinlich fast alle 1 Million Elemente des Beispiels enthalten würden. Das Problem beginnt, wenn beispielsweise 1 Milliarde Elemente vorhanden sind oder die gleiche 1-Millionen-Sequenz 1000-mal in einem Programm vorkommt, was zu OutOfMemoryError und generell zu OutOfMemoryError aufgrund von übermäßigem GC-Overhead führt.

Ähnlich wie die Fehlerbehandlung zu einem erstklassigen Bürger wurde und Operatoren zur Bearbeitung erhielt (über onErrorXXX Operatoren), ist onErrorXXX eine weitere Eigenschaft von Datenflüssen, über die der Programmierer (über onBackpressureXXX Operatoren) nachdenken und sie handhaben onBackpressureXXX .

Neben dem PublishSubject genannten PublishSubject gibt es andere Operatoren, die den Gegendruck hauptsächlich aus funktionalen Gründen nicht unterstützen. Zum Beispiel kann das Bediener interval emittiert periodisch Werte, es backpressuring führen würde , in der Zeit relativ zu einer Wanduhr zu verschieben.

In modernen RxJava verfügen die meisten asynchronen Operatoren jetzt über einen begrenzten internen Puffer, wie beispielsweise observeOn oben. Jeder Versuch, diesen Puffer zu überlaufen, beendet die gesamte Sequenz mit MissingBackpressureException . Die Dokumentation jedes Bedieners enthält eine Beschreibung seines Gegendruckverhaltens.

In normalen kalten Abläufen ist der Gegendruck jedoch subtiler vorhanden (was MissingBackpressureException nicht ergeben sollte und sollte). Wenn das erste Beispiel neu geschrieben wird:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Es gibt keinen Fehler und alles läuft reibungslos mit wenig Speicherbedarf. Der Grund dafür ist, dass viele observeOn Werte auf Anforderung "generieren" können, und der Operator observeOn kann dem range mitteilen, dass höchstens so viele Werte generiert werden, die der observeOn Puffer ohne Überlauf auf einmal halten kann.

Diese Verhandlung basiert auf dem Informatikkonzept der Co-Routinen (ich rufe Sie an, Sie rufen mich an). Der Bediener range sendet einen Rückruf, in Form einer Implementierung der Producer - Schnittstelle, an die observeOn durch ihren (inneren Aufruf Subscriber ‚s) setProducer . Im Gegenzug ruft das observeOn Producer.request(n) mit einem Wert auf, mit dem der range wird, den es erzeugen darf (dh onNext ), und zwar viele zusätzliche Elemente. Es ist dann die observeOn , die request zum richtigen Zeitpunkt und mit dem richtigen Wert aufzurufen, damit die Daten fließen, aber nicht überlaufen.

Das Ausdrücken eines Rückdrucks bei Endverbrauchern ist selten notwendig (weil sie in Bezug auf ihren unmittelbaren Upstream synchron sind, und der Rückdruck geschieht natürlich aufgrund der Blockierung der Aufrufstapel), es kann jedoch einfacher sein, die Funktionsweise des Endverbrauchers zu verstehen:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Die onStart Implementierung gibt hier den range an, um den ersten Wert zu erzeugen, der dann in onNext . Sobald die compute(int) , wird der andere Wert aus dem range angefordert. In einer naiven Implementierung range würde eine solche Aufforderung rekursiv rufen onNext , an führende StackOverflowError was natürlich unerwünscht ist .

Um dies zu verhindern, verwenden Operatoren eine sogenannte Trampolin-Logik, die solche wiedereintrittsbedingten Aufrufe verhindert. In range wird daran erinnert, dass während des Aufrufs von onNext() ein request(1) onNext() Sobald onNext() zurückkehrt, wird eine weitere Runde ausgeführt und onNext() mit dem nächsten Ganzzahlwert onNext() . Wenn also beide ausgetauscht werden, funktioniert das Beispiel immer noch gleich:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Dies gilt jedoch nicht für onStart . Obwohl die Observable Infrastruktur garantiert wird es höchstens einmal auf jedem heißen Subscriber , der Anruf zu request(1) kann direkt die Emission eines Elements auslösen. Wenn nach dem Aufruf von request(1) eine Initialisierungslogik vorhanden ist, die von onNext benötigt wird, kann es zu Ausnahmen kommen:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

In diesem synchronen Fall wird sofort eine NullPointerException ausgelöst, während onStart noch ausgeführt onStart . Ein subtilerer Fehler tritt auf, wenn der Aufruf von request(1) einen asynchronen Aufruf von onNext in einem anderen Thread auslöst und den name in onNext Races onNext , indem er in der onStart post- request .

Daher sollte man die gesamte onStart in onStart oder sogar davor durchführen und request() zuletzt aufrufen. Implementierungen von request() in Operatoren stellen sicher, dass bei Bedarf eine ordnungsgemäße Beziehung vor dem Ereignis (oder in anderen Begriffen: Speicherfreigabe oder vollständiger Zaun) besteht.

Die onBackpressureXXX-Operatoren

Die meisten Entwickler stoßen auf einen Gegendruck, wenn ihre Anwendung mit MissingBackpressureException und die Ausnahme normalerweise auf den observeOn Operator observeOn . Die eigentliche Ursache ist normalerweise die nicht- PublishSubject Verwendung von PublishSubject , timer() oder interval() oder benutzerdefinierten Operatoren, die mit create() .

Es gibt verschiedene Möglichkeiten, mit solchen Situationen umzugehen.

Vergrößern der Puffergrößen

In manchen Fällen entstehen solche Überläufe aufgrund von bersten Quellen. Plötzlich tippt der Benutzer zu schnell auf den Bildschirm und observeOn bei Android-Überläufen den standardmäßigen internen 16-Element-Puffer von On.

Die meisten gegen den Druckausdruck empfindlichen Operatoren in den aktuellen Versionen von RxJava ermöglichen es Programmierern nun, die Größe ihrer internen Puffer anzugeben. Die relevanten Parameter werden üblicherweise als bufferSize , prefetch oder capacityHint . In Anbetracht des überfließenden Beispiels in der Einführung können wir die Puffergröße von observeOn einfach erhöhen, um genügend Platz für alle Werte zu haben.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Beachten Sie jedoch, dass dies im Allgemeinen nur eine temporäre Lösung ist, da der Überlauf immer noch auftreten kann, wenn die Quelle die vorhergesagte Puffergröße überproduziert. In diesem Fall kann einer der folgenden Operatoren verwendet werden.

Batching / Überspringen von Werten mit Standardoperatoren

MissingBackpressureException die Quelldaten im Stapel effizienter verarbeitet werden können, können Sie die Wahrscheinlichkeit von MissingBackpressureException verringern, indem Sie einen der standardmäßigen Stapeloperatoren (nach Größe und / oder Zeit) verwenden.

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Wenn einige der Werte sicher ignoriert werden können, können Sie die Sampling-Funktion (mit der Zeit oder ein anderes Observable) und die Throttling-Operatoren ( throttleFirst , throttleLast , throttleWithTimeout ) verwenden.

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Beachten Sie jedoch, dass diese Operatoren nur die Rate des Empfangens von Werten durch den Downstream reduzieren und daher zu MissingBackpressureException führen MissingBackpressureException .

onBackpressureBuffer ()

Dieser Operator führt in seiner parameterlosen Form einen unbegrenzten Puffer zwischen der vorgelagerten Quelle und dem nachgelagerten Operator ein. Unbegrenzt sein bedeutet, solange die JVM nicht über genügend Arbeitsspeicher verfügt, kann sie mit fast jeder Menge umgehen, die aus einer quälenden Quelle kommt.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

In diesem Beispiel geht das observeOn mit einer sehr geringen Puffergröße MissingBackpressureException gibt jedoch keine MissingBackpressureException da onBackpressureBuffer alle 1 Million Werte onBackpressureBuffer und kleine Batches davon zur observeOn .

Beachten Sie jedoch, dass onBackpressureBuffer seine Quelle auf unbegrenzte Weise verbraucht, d. onBackpressureBuffer , Ohne dass ein Gegendruck darauf onBackpressureBuffer wird. Dies hat zur Folge, dass auch eine Gegendruck unterstützende Quelle wie range vollständig realisiert wird.

Es gibt 4 zusätzliche Überladungen von onBackpressureBuffer

onBackpressureBuffer (int. Kapazität)

Dies ist eine begrenzte Version, die BufferOverflowError signalisiert, falls der Puffer die angegebene Kapazität erreicht.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Die Bedeutung dieses Operators nimmt ab, da immer mehr Operatoren die Einstellung der Puffergröße zulassen. Im Übrigen bietet dies die Möglichkeit, den internen Puffer zu erweitern, indem eine größere Anzahl mit onBackpressureBuffer als die Standardeinstellung verwendet wird.

onBackpressureBuffer (int capacity, Action0 onOverflow)

Diese Überladung ruft eine (gemeinsam genutzte) Aktion für den Fall eines Überlaufs auf. Seine Nützlichkeit ist eher begrenzt, da über den Überlauf keine anderen Informationen als der aktuelle Aufrufstapel bereitgestellt werden.

onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflow.Strategy-Strategie)

Diese Überlastung ist tatsächlich sinnvoller, da sie definieren soll, was zu tun ist, wenn die Kapazität erreicht ist. Die BackpressureOverflow.Strategy ist eigentlich eine Schnittstelle, aber die Klasse BackpressureOverflow bietet 4 statische Felder, deren Implementierungen typische Aktionen darstellen:

  • ON_OVERFLOW_ERROR : Dies ist das Standardverhalten der vorherigen beiden Überladungen, das eine BufferOverflowException signalisiert
  • ON_OVERFLOW_DEFAULT : Momentan ist es das gleiche wie ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : Wenn ein Überlauf auftreten würde, wird der aktuelle Wert einfach ignoriert und nur die alten Werte werden einmalig von den Downstream-Anforderungen geliefert.
  • ON_OVERFLOW_DROP_OLDEST : ON_OVERFLOW_DROP_OLDEST das älteste Element im Puffer und fügt den aktuellen Wert hinzu.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Beachten Sie, dass die letzten beiden Strategien eine Diskontinuität im Stream verursachen, wenn Elemente ausgeblendet werden. Darüber hinaus signalisieren sie keine BufferOverflowException .

onBackpressureDrop ()

Wenn der Downstream nicht bereit ist, Werte zu empfangen, wird dieser Operator dieses Element aus der Sequenz entfernen. Man kann sich das als einen onBackpressureBuffer mit der Kapazität onBackpressureBuffer ON_OVERFLOW_DROP_LATEST .

Dieser Operator ist nützlich, wenn Werte aus einer Quelle (z. B. Mausbewegungen oder aktuelle GPS-Positionssignale) sicher ignoriert werden können, da später aktuellere Werte angezeigt werden.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Dies kann in Verbindung mit dem interval() nützlich sein. Wenn beispielsweise eine periodische Hintergrundaufgabe ausgeführt werden soll, die Iteration jedoch länger als die Periode dauern kann, kann die Benachrichtigung über das überschüssige Intervall gelöscht werden, da später mehr angezeigt wird:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Es gibt eine Überladung dieses Operators: onBackpressureDrop(Action1<? super T> onDrop) bei der die (gemeinsam genutzte) Aktion aufgerufen wird, wobei der Wert fallen gelassen wird. Diese Variante ermöglicht das Bereinigen der Werte selbst (z. B. Freigeben zugehöriger Ressourcen).

onBackpressureLatest ()

Der letzte Operator behält nur den neuesten Wert und überschreibt praktisch ältere, nicht gelieferte Werte. Man kann sich dies als eine Variante des onBackpressureBuffer mit einer Kapazität von 1 und der Strategie ON_OVERFLOW_DROP_OLDEST .

Im Gegensatz zu onBackpressureDrop steht immer ein Wert für den Verbrauch zur Verfügung, wenn der Downstream hinterherhinkt. Dies kann in einigen telemetrieähnlichen Situationen nützlich sein, in denen die Daten in einem unruhigen Muster erscheinen, aber nur die neuesten Daten für die Verarbeitung interessant sind.

Wenn der Benutzer zum Beispiel viel auf den Bildschirm klickt, möchten wir trotzdem auf seine neuesten Eingaben reagieren.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

Die Verwendung von onBackpressureDrop in diesem Fall dazu führen, dass der letzte Klick abgeworfen wird und der Benutzer sich wundert, warum die Geschäftslogik nicht ausgeführt wurde.

Backpressured-Datenquellen erstellen

Das Erstellen von komprimierten Datenquellen ist im Allgemeinen die relativ einfache Aufgabe im Umgang mit dem Backpressure, da die Bibliothek bereits statische Methoden für Observable anbietet, die den Backpressure für den Entwickler handhaben. Wir können zwei Arten von Factory-Methoden unterscheiden: kalte "Generatoren", die entweder Elemente zurückgeben und basierend auf der nachgelagerten Nachfrage erzeugen, und "heiße" Pusher ", die normalerweise nicht reaktive und / oder nicht-unterdrückbare Datenquellen überbrücken und einige Rückdruckhandlungen überlagern Sie.

gerade

Die grundlegendste Rückstau bewusst Quelle über erstellt just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Da wir in onStart ausdrücklich keine Anfragen onStart , wird nichts gedruckt. just toll, wenn es einen konstanten Wert gibt, mit dem wir eine Sequenz starten möchten.

Leider just ist oft für eine Art und Weise falsch , etwas zu berechnen dynamisch durch verbraucht werden Subscriber s:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Überraschend für einige ist, dass 1 zweimal gedruckt wird, anstatt 1 bzw. 2 zu drucken. Wenn der Anruf umgeschrieben wird, wird offensichtlich, warum er so funktioniert:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

Der computeValue wird als Teil der Hauptroutine und nicht als Antwort auf die Abonnenten der Abonnenten aufgerufen.

vonCallable

Was die Leute wirklich brauchen, ist die Methode von fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Hier wird computeValue nur ausgeführt, wenn ein Abonnent abonniert und für jeden von ihnen die erwartete 1 und 2 fromCallable . Natürlich unterstützt fromCallable auch den Gegendruck ordnungsgemäß und gibt den berechneten Wert nicht aus, wenn dies nicht angefordert wird. Beachten Sie jedoch, dass die Berechnung trotzdem stattfindet. Für den Fall, dass die Berechnung selbst verzögert werden soll, bis der Downstream tatsächlich angefordert wird, können wir just mit map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just wird der konstante Wert erst computeValue , wenn er dem Ergebnis des computeValue , der noch für jeden Teilnehmer einzeln aufgerufen wird.

von

Wenn die Daten bereits verfügbar ist als ein Array von Gegenständen, eine Liste von Objekten oder einer Iterable Quelle, wobei die jeweiligen from Überlastungen den Gegendruck und die Emission von solchen Quellen handhaben :

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Der Einfachheit halber (und Warnungen über generische Array - Erstellung zu vermeiden) gibt es 2 bis 10 Argument Überlastungen just , dass intern delegieren from .

Das from(Iterable) bietet auch eine interessante Gelegenheit. Viele Wertschöpfungen können in Form einer Zustandsmaschine ausgedrückt werden. Jedes angeforderte Element löst einen Zustandsübergang und eine Berechnung des zurückgegebenen Werts aus.

Das Schreiben solcher Zustandsmaschinen als Iterable s ist etwas kompliziert (aber immer noch einfacher als das Schreiben eines Observable zum Konsumieren). Im Gegensatz zu C # bietet Java keine Unterstützung des Compilers, um solche Zustandsmaschinen zu erstellen, indem einfach klassisch aussehender Code (mit yield return und yield break ). Einige Bibliotheken bieten einige Hilfestellungen, z. B. AbstractIterable Google Guava und Ix.generate() Ix.forloop() und Ix.forloop() . Diese sind an sich schon einer ganzen Reihe würdig. Sehen wir uns also eine sehr grundlegende Iterable Quelle an, die einen konstanten Wert auf unbestimmte Zeit wiederholt:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Wenn wir den iterator über die klassische for-Schleife verbrauchen, führt dies zu einer Endlosschleife. Da wir daraus ein Observable bauen, können wir unseren Willen zum Ausdruck bringen, nur die ersten fünf davon zu konsumieren und dann aufhören, irgendetwas anzufordern. Dies ist die wahre Macht der trägen Auswertung und Berechnung innerhalb von Observable s.

erstellen (SyncOnSubscribe)

Manchmal ist die Datenquelle, die in die reaktive Welt umgewandelt werden soll, synchron (blockierend) und pull-artig, d. H. Wir müssen eine get oder read Methode aufrufen, um die nächsten Daten zu erhalten. Man könnte das natürlich in eine Iterable aber wenn solche Quellen mit Ressourcen Iterable sind, können diese Ressourcen auslaufen, wenn der Downstream die Sequenz abbestellt, bevor sie enden würde.

Für solche Fälle verfügt RxJava über die SyncOnSubscribe Klasse. Man kann es erweitern und seine Methoden implementieren oder eine seiner Lambda-basierten Factory-Methoden verwenden, um eine Instanz zu erstellen.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

Im SyncOnSubscribe verwendet SyncOnSubscribe 3 Rückrufe.

Die ersten Rückrufe ermöglichen das Erstellen eines Zustands pro Teilnehmer, z. B. FileInputStream im Beispiel. Die Datei wird für jeden einzelnen Teilnehmer unabhängig geöffnet.

Der zweite Callback nimmt dieses onXXX und stellt einen Ausgabe- Observer onXXX dessen onXXX Methoden onXXX werden können, um Werte onXXX . Dieser Callback wird so oft ausgeführt, wie der Downstream angefordert wurde. Bei jedem Aufruf muss onNext höchstens einmal onNext werden, optional gefolgt von onError oder onCompleted . In dem Beispiel rufen wir onCompleted() wenn das gelesene Byte negativ ist, das Ende der Datei onError , und onError falls der IOException eine IOException .

Der letzte Rückruf wird aufgerufen, wenn der Downstream seine Abmeldung aufhebt (den Eingangsstrom schließt) oder wenn der vorherige Rückruf die Terminalmethoden aufgerufen hat. es ermöglicht die Freigabe von Ressourcen. Da nicht alle Quellen alle diese Funktionen benötigen, können die statischen Methoden von SyncOnSubscribe Instanzen ohne sie erstellen.

Leider lösen viele Methodenaufrufe in der gesamten JVM und anderen Bibliotheken geprüfte Ausnahmen aus und müssen in try-catch da die von dieser Klasse verwendeten funktionalen Schnittstellen keine geprüften Ausnahmen zulassen.

Natürlich können wir andere typische Quellen imitieren, wie zum Beispiel eine unbeschränkte Reichweite:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

In diesem Setup beginnt der current mit 0 und beim nächsten Aufruf von Lambda hält der Parameter current jetzt 1 .

Es gibt eine Variante von SyncOnSubscribe Namen AsyncOnSubscribe , die ziemlich ähnlich aussieht, mit der Ausnahme, dass der mittlere Rückruf auch einen langen Wert annimmt, der den Anforderungsbetrag von Downstream darstellt, und der Rückruf sollte ein Observable mit der gleichen Länge generieren. Diese Quelle verkettet dann alle diese Observable zu einer einzigen Sequenz.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Es gibt eine anhaltende (hitzige) Diskussion über die Nützlichkeit dieser Klasse und wird im Allgemeinen nicht empfohlen, da sie routinemäßig die Erwartungen darüber, wie diese generierten Werte tatsächlich ausgegeben werden und wie sie darauf reagieren werden, oder sogar die Art der Anforderungswerte, die sie empfängt, widerlegt komplexere Verbraucherszenarien.

erstellen (Sender)

Manchmal ist die Quelle, die in ein Observable soll, bereits heiß (z. B. Mausbewegungen) oder kalt, jedoch nicht in ihrer API (z. B. ein asynchroner Netzwerkrückruf) unter Druck gesetzt.

In solchen Fällen wurde in einer aktuellen Version von RxJava die Factory-Methode create(emitter) . Es sind zwei Parameter erforderlich:

  • einen Rückruf, der mit einer Instanz der Emitter<T> -Schnittstelle für jeden eingehenden Teilnehmer aufgerufen wird,
  • eine Emitter.BackpressureMode Enumeration, in der der Entwickler das anzuwendende Emitter.BackpressureMode angeben muss. Es hat die üblichen Modi, ähnlich wie bei onBackpressureXXX zusätzlich zu einer MissingBackpressureException oder zum einfachen Ignorieren eines solchen Überlaufs.

Beachten Sie, dass derzeit keine zusätzlichen Parameter für diese Gegendruckmodi unterstützt werden. Wenn Sie diese Anpassung benötigen, verwenden Sie NONE als onBackpressureXXX und wenden den entsprechenden onBackpressureXXX auf das resultierende Observable an.

Der erste typische Fall für die Verwendung, wenn Sie mit einer Push-basierten Quelle interagieren möchten, beispielsweise mit GUI-Ereignissen. Diese APIs verfügen über eine Form von addListener / removeListener Aufrufen, die verwendet werden können:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Der Emitter ist relativ einfach zu benutzen; onNext , onError und onCompleted können darauf onNext , und der Operator übernimmt die Verwaltung des onCompleted und der Abmeldung selbst. Darüber hinaus, wenn die umwickelte API cancellation (wie beispielsweise der Hörer Entfernung in dem Beispiel) unterstützt, kann man die Verwendung setCancellation (oder setSubscription für Subscription -ähnlichen Ressourcen) einen Annullierung Rückruf zu registrieren, wenn der stromabwärtige abmeldet oder die aufgerufen wird onError / onCompleted wird in der bereitgestellten Emitter Instanz aufgerufen.

Bei diesen Methoden kann jeweils nur eine einzige Ressource mit dem Emitter verknüpft werden, und durch das Einrichten einer neuen Ressource wird die alte automatisch deaktiviert. Wenn mehrere Ressourcen behandelt werden müssen, erstellen Sie ein CompositeSubscription , verknüpfen Sie es mit dem Emitter und fügen Sie dem CompositeSubscription selbst weitere Ressourcen hinzu:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Das zweite Szenario umfasst normalerweise eine asynchrone, auf Rückruf basierende API, die in ein Observable konvertiert werden muss.

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

In diesem Fall arbeitet die Delegation auf dieselbe Weise. Leider unterstützen diese klassischen APIs im Callback-Stil normalerweise keine Stornierung, aber wenn dies der Fall ist, können Sie ihre Stornierung genau wie in den vorangegangenen Beispielen einrichten (mit einer etwas aufwendigeren Methode). Beachten Sie die Verwendung des LATEST Gegendruckmodus. Wenn wir wissen, dass es nur einen einzigen Wert gibt, brauchen wir die BUFFER Strategie nicht, da sie einen standardmäßig langen 128-Element-Puffer (der bei Bedarf BUFFER , der niemals vollständig ausgenutzt wird.



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow