Zoeken…


Invoering

Tegendruk is wanneer in een Observable verwerkingspijplijn sommige asynchrone fasen de waarden niet snel genoeg kunnen verwerken en een manier nodig hebben om de stroomopwaartse producent te vertellen om te vertragen.

Het klassieke geval van de noodzaak van tegendruk is wanneer de producent een hete bron is:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

In dit voorbeeld zal de hoofdthread 1 miljoen items produceren voor een eindconsument die deze op een achtergrondthread verwerkt. Het is waarschijnlijk dat de compute(int) enige tijd kost, maar de overhead van de Observable operatieketen kan ook bijdragen aan de tijd die nodig is om items te verwerken. De producerende thread met de for-lus kan dit echter niet weten en blijft onNext .

Intern hebben asynchrone operators buffers om dergelijke elementen vast te houden totdat ze kunnen worden verwerkt. In de klassieke Rx.NET en vroege RxJava waren deze buffers onbegrensd, wat betekent dat ze waarschijnlijk bijna alle 1 miljoen elementen uit het voorbeeld zouden bevatten. Het probleem begint wanneer er bijvoorbeeld 1 miljard elementen zijn of dezelfde reeks van 1 miljoen 1000 keer voorkomt in een programma, wat leidt tot OutOfMemoryError en over het algemeen vertragingen als gevolg van buitensporige GC-overhead.

Net als hoe foutafhandeling een eersteklas burger werd en operators ontving om ermee om te gaan (via onErrorXXX operators), is tegendruk een andere eigenschap van gegevensstromen waar de programmeur over moet nadenken en omgaan (via onBackpressureXXX operators).

Naast het bovenstaande PublishSubject zijn er andere operatoren die geen tegendruk ondersteunen, meestal vanwege functionele redenen. Bijvoorbeeld de bediener interval uitzendt waarden periodiek tegendruk dit zou leiden tot een verschuiving in de tijd ten opzichte van een wand klok.

In moderne RxJava hebben de meeste asynchrone operatoren nu een begrensde interne buffer, zoals observeOn hierboven en elke poging om deze buffer te overlopen, beëindigt de hele reeks met MissingBackpressureException . De documentatie van elke operator heeft een beschrijving van zijn tegendrukgedrag.

Tegendruk is echter subtieler aanwezig in normale koude sequenties (die geen MissingBackpressureException opleveren). Als het eerste voorbeeld wordt herschreven:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Er is geen fout en alles verloopt soepel met een klein geheugengebruik. De reden hiervoor is dat veel observeOn op verzoek waarden kunnen 'genereren' en dat de operator observeOn kan vertellen dat het range maximaal zoveel waarden genereert die de observeOn buffer tegelijk kan bevatten zonder overloop.

Deze onderhandeling is gebaseerd op het computerwetenschapsconcept van co-routines (ik noem u, u belt mij). De operator range stuurt een callback, in de vorm van een implementatie van de Producer -interface, de observeOn door te bellen naar haar (innerlijke Subscriber 's) setProducer . In ruil observeOn roept de observeOn Producer.request(n) met een waarde om aan het range het mag produceren (dat wil zeggen onNext ) te vertellen dat er veel extra elementen zijn. Het is dan de verantwoordelijkheid van observeOn om de request op het juiste moment en met de juiste waarde aan te roepen om de gegevens te laten stromen maar niet te overlopen.

Het is zelden nodig om tegendruk uit te drukken bij eindgebruikers (omdat ze synchroon zijn met betrekking tot hun directe stroomopwaartse en tegendruk gebeurt natuurlijk vanwege call-stack blokkering), maar het is misschien gemakkelijker om de werking ervan te begrijpen:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Hier geeft de onStart implementatie het range aan om zijn eerste waarde te produceren, die vervolgens wordt ontvangen in onNext . Zodra de compute(int) voltooid, wordt de andere waarde uit het range opgevraagd. In een naïeve implementatie van range zou een dergelijke aanroep recursief een beroep doen op onNext , wat leidt tot StackOverflowError wat natuurlijk ongewenst is.

Om dit te voorkomen, gebruiken operators de zogenaamde trampolining-logica die dergelijke terugkomende oproepen voorkomt. In termen van range , zal het onthouden dat er een request(1) aanroep was, terwijl het onNext() onNext() en zodra onNext() terugkeert, zal het nog een ronde maken en onNext() aanroepen met de volgende gehele waarde. Als de twee worden verwisseld, werkt het voorbeeld daarom nog steeds hetzelfde:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Dit geldt echter niet voor onStart . Hoewel de Observable infrastructuur garandeert dat deze hoogstens één keer bij elke Subscriber wordt aangeroepen, kan de oproep om te request(1) meteen de emissie van een element veroorzaken. Als er na de call to request(1) initialisatielogica is die nodig is door onNext , kunnen er uitzonderingen ontstaan:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

In dit synchrone geval wordt onmiddellijk een NullPointerException gegenereerd terwijl deze nog steeds onStart . Een subtielere fout gebeurt als de aanroep request(1) veroorzaakt een asynchrone aanroep onNext op een andere thread lezen en name in onNext races te schrijven in onStart bericht request .

Daarom moet men alle onStart in onStart of zelfs daarvoor uitvoeren en als laatste request() aanroepen. Implementaties van request() in operators zorgen voor een juiste gebeurtenis-voor relatie (of, anders gezegd, geheugenrelease of volledige afrastering) indien nodig.

De onBackpressureXXX-operators

De meeste ontwikkelaars ondervinden tegendruk wanneer hun toepassing mislukt met MissingBackpressureException en de uitzondering MissingBackpressureException meestal naar de operator observeOn . De werkelijke oorzaak is meestal het niet- PublishSubject gebruik van PublishSubject , timer() of interval() of aangepaste operators die zijn gemaakt via create() .

Er zijn verschillende manieren om met dergelijke situaties om te gaan.

De buffergrootte vergroten

Soms ontstaan dergelijke overstorten als gevolg van burstybronnen. Plots tikt de gebruiker te snel op het scherm en observeOn de standaard 16-elementen interne buffer op Android over.

Met de meeste tegendrukgevoelige operators in de recente versies van RxJava kunnen programmeurs nu de grootte van hun interne buffers opgeven. De relevante parameters worden meestal bufferSize , prefetch of capacityHint . Gezien het overvolle voorbeeld in de inleiding, kunnen we gewoon de buffergrootte van observeOn om voldoende ruimte te hebben voor alle waarden.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Merk echter op dat dit in het algemeen slechts een tijdelijke oplossing kan zijn, omdat de overloop nog steeds kan optreden als de bron de voorspelde buffergrootte overproduceert. In dit geval kan een van de volgende operatoren worden gebruikt.

Batching / overslaan waarden met standaard operators

Als de brongegevens in batch efficiënter kunnen worden verwerkt, kan de kans op MissingBackpressureException door een van de standaard batch-operators te gebruiken (op grootte en / of op tijd).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Als sommige van de waarden veilig kunnen worden genegeerd, kan men de bemonstering (met tijd of een andere Observable) en throttling-operators ( throttleFirst , throttleLast , throttleWithTimeout ) gebruiken.

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Merk echter op dat deze operatoren alleen de snelheid van de waardeontvangst door de downstream verlagen en dus nog steeds kunnen leiden tot MissingBackpressureException .

onBackpressureBuffer ()

Deze operator in zijn parameterloze vorm herintroduceert een onbegrensde buffer tussen de stroomopwaartse bron en de stroomafwaartse operator. Ongebonden zijn betekent dat zolang de JVM geen geheugen meer heeft, hij vrijwel elke hoeveelheid kan verwerken die afkomstig is van een bursty-bron.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

In dit voorbeeld gaat de observeOn met een zeer lage buffergrootte, maar er is geen MissingBackpressureException omdat onBackpressureBuffer alle 1 miljoen waarden opzuigt en kleine batches ervan observeOn om te observeOn .

Merk echter op dat onBackpressureBuffer zijn bron op een onbeperkte manier verbruikt, dat wil zeggen zonder daarop enige tegendruk toe te passen. Dit heeft tot gevolg dat zelfs een tegendrukondersteunende bron zoals range volledig zal worden gerealiseerd.

Er zijn 4 extra overbelastingen van onBackpressureBuffer

onBackpressureBuffer (int capaciteit)

Dit is een begrensde versie die BufferOverflowError in het geval dat de buffer de gegeven capaciteit bereikt.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

De relevantie van deze operator neemt af naarmate steeds meer operators hun buffergrootte instellen. Voor de rest biedt dit de mogelijkheid om "hun interne buffer uit te breiden" door een groter aantal met onBackpressureBuffer dan hun standaardwaarde.

onBackpressureBuffer (int capaciteit, Action0 onOverflow)

Deze overbelasting roept een (gedeelde) actie op voor het geval er een overloop optreedt. Het nut ervan is vrij beperkt, omdat er geen andere informatie wordt verstrekt over de overloop dan de huidige oproepstack.

onBackpressureBuffer (int capaciteit, Action0 onOverflow, BackpressureOverflow.Strategiestrategie)

Deze overbelasting is eigenlijk nuttiger omdat het ons laat definiëren wat te doen in het geval dat de capaciteit is bereikt. De BackpressureOverflow.Strategy is eigenlijk een interface, maar de klasse BackpressureOverflow biedt 4 statische velden met implementaties ervan die typische acties vertegenwoordigen:

  • ON_OVERFLOW_ERROR : dit is het standaardgedrag van de vorige twee overbelastingen, signalering van een BufferOverflowException
  • ON_OVERFLOW_DEFAULT : momenteel is dit hetzelfde als ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : als er een overloop zou optreden, wordt de huidige waarde gewoon genegeerd en worden alleen de oude waarden geleverd zodra de downstream-aanvragen zijn ingediend.
  • ON_OVERFLOW_DROP_OLDEST : laat het oudste element in de buffer vallen en voegt de huidige waarde eraan toe.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Merk op dat de laatste twee strategieën discontinuïteit in de stroom veroorzaken als ze elementen wegvallen. Bovendien zullen ze BufferOverflowException niet signaleren.

onBackpressureDrop ()

Wanneer de downstream niet gereed is om waarden te ontvangen, laat deze operator dat elemenet uit de reeks vallen. Je kunt het zien als een 0-capaciteit op onBackpressureBuffer met strategie ON_OVERFLOW_DROP_LATEST .

Deze operator is handig wanneer iemand waarden uit een bron (zoals muisbewegingen of huidige GPS-locatiesignalen) veilig kan negeren, omdat er later meer actuele waarden zijn.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Dit kan handig zijn in combinatie met het interval() . Als u bijvoorbeeld een periodieke achtergrondtaak wilt uitvoeren, maar elke iteratie langer kan duren dan de periode, is het veilig om de kennisgeving van het overtollige interval te laten vallen, omdat er later meer zijn:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Er is één overbelasting van deze operator: onBackpressureDrop(Action1<? super T> onDrop) waar de (gedeelde) actie wordt aangeroepen met de waarde die wordt verwijderd. Met deze variant kunt u de waarden zelf opschonen (bijv. Bijbehorende bronnen vrijgeven).

onBackpressureLatest ()

De laatste operator behoudt alleen de laatste waarde en overschrijft praktisch oudere, niet-geleverde waarden. Je kunt dit zien als een variant van de onBackpressureBuffer met een capaciteit van 1 en strategie van ON_OVERFLOW_DROP_OLDEST .

In tegenstelling tot onBackpressureDrop er altijd een waarde beschikbaar voor consumptie als de downstream achterblijft. Dit kan handig zijn in sommige telemetrie-achtige situaties waarin de gegevens in een bursty-patroon kunnen komen, maar alleen de allernieuwste is interessant voor verwerking.

Als de gebruiker bijvoorbeeld veel op het scherm klikt, willen we nog steeds reageren op de laatste invoer.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

Het gebruik van onBackpressureDrop in dit geval leiden tot een situatie waarin de allerlaatste klik valt en de gebruiker zich afvraagt waarom de bedrijfslogica niet is uitgevoerd.

Backpressured gegevensbronnen maken

Het maken van tegendrukgegevensbronnen is in het algemeen de relatief eenvoudige taak bij het omgaan met tegendruk in het algemeen, omdat de bibliotheek al statische methoden op Observable die tegendruk voor de ontwikkelaar verwerken. We kunnen twee soorten fabrieksmethoden onderscheiden: koude "generatoren" die ofwel elementen retourneren en genereren op basis van downstream-vraag en hete "pushers" die meestal niet-reactieve en / of niet-onderdrukbare gegevensbronnen overbruggen en wat backpressure-verwerking op elkaar leggen hen.

alleen maar

De meest fundamentele tegendruk op de hoogte bron wordt gemaakt via just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Omdat we dit niet expliciet in onStart , wordt er niets afgedrukt. just geweldig als er een constante waarde is die we graag een reeks willen starten.

Helaas wordt just vaak verward met een manier om iets dynamisch te berekenen voor consumptie door Subscriber 's:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Verrassend voor sommigen, drukt dit 1 tweemaal af in plaats van respectievelijk 1 en 2 af te drukken. Als de oproep wordt herschreven, wordt het duidelijk waarom het zo werkt:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

De computeValue wordt genoemd als onderdeel van de hoofdroutine en niet als reactie op de inschrijvingen van de abonnees.

fromCallable

Wat mensen eigenlijk nodig hebben, is de methode fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Hier wordt de computeValue alleen uitgevoerd wanneer een abonnee zich abonneert en voor elk van hen de verwachte 1 en 2 afdrukt. Natuurlijk ondersteunt fromCallable ook tegendruk en zal de berekende waarde niet worden fromCallable tenzij daarom wordt gevraagd. Merk echter op dat de berekening toch gebeurt. In het geval dat de berekening zelf wordt uitgesteld totdat de downstream daadwerkelijk vraagt, kunnen we just met map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just zal zijn constante waarde niet uitzenden totdat gevraagd wanneer het wordt toegewezen aan het resultaat van de computeValue , nog onder de naam voor elke abonnee afzonderlijk.

van

Als de gegevens reeds beschikbaar als een array van objecten, een lijst van voorwerpen of Iterable bron, de respectieve from overbelasting zal de tegendruk en emissie van dergelijke bronnen te behandelen:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Voor het gemak (en het vermijden van waarschuwingen over generiek maken van de array) zijn er 2 tot 10 argument overbelasting om just die intern delegeren aan from .

De from(Iterable) ook een interessante mogelijkheid. Veel waardeontwikkeling kan worden uitgedrukt in een vorm van een staatsmachine. Elk aangevraagd element activeert een statusovergang en berekening van de geretourneerde waarde.

Het schrijven van dergelijke state-machines als Iterable s is enigszins ingewikkeld (maar nog steeds gemakkelijker dan het schrijven van een Observable om het te consumeren) en in tegenstelling tot C #, heeft Java geen enkele ondersteuning van de compiler om dergelijke state-machines te bouwen door eenvoudig klassiek ogende code te schrijven (met yield return en yield break ). Sommige bibliotheken bieden hulp, zoals AbstractIterable Google Guava en Ix.generate() en Ix.forloop() . Deze zijn op zichzelf een volledige serie waard, dus laten we eens kijken naar een heel eenvoudige Iterable bron die een constante waarde voor onbepaalde tijd herhaalt:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Als we de iterator zouden consumeren via de klassieke for-loop, zou dat resulteren in een oneindige lus. Omdat we er een Observable van maken, kunnen we onze wil uiten om alleen de eerste 5 ervan te consumeren en dan stoppen met het aanvragen van iets. Dit is de ware kracht van lui evalueren en rekenen binnen Observable s.

te maken (SyncOnSubscribe)

Soms is de gegevensbron die moet worden geconverteerd naar de reactieve wereld zelf synchroon (blokkeren) en pull-like, dat wil zeggen dat we een methode get of read moeten gebruiken om het volgende stuk gegevens te krijgen. Je zou dat natuurlijk in een Iterable maar wanneer dergelijke bronnen worden geassocieerd met bronnen, kunnen we die bronnen lekken als de downstream de reeks uitschrijft voordat deze zou eindigen.

Om dergelijke gevallen af te handelen, heeft SyncOnSubscribe klasse SyncOnSubscribe . Men kan het uitbreiden en zijn methoden implementeren of een van zijn op lambda gebaseerde fabrieksmethoden gebruiken om een instantie te bouwen.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

Over het algemeen gebruikt SyncOnSubscribe 3 callbacks.

Met de eerste callbacks kan men een status per abonnee maken, zoals de FileInputStream in het voorbeeld; het bestand wordt onafhankelijk geopend voor elke individuele abonnee.

De tweede callback neemt deze toestand object en een uitgangssignaal Observer waarvan onXXX werkwijzen kunnen worden opgeroepen om emit waarden. Deze callback wordt zo vaak uitgevoerd als de downstream heeft gevraagd. Bij elke aanroep moet het hoogstens één keer een beroep doen onNext , eventueel gevolgd door onError of onCompleted . In het voorbeeld roepen we onCompleted() als de gelezen byte negatief is, wat aangeeft en het einde van het bestand is, en roepen onError aan voor het geval de read een IOException .

De laatste callback wordt geactiveerd wanneer de downstream zich afmeldt (de inputstream sluit) of wanneer de vorige callback de terminal-methoden oproept; hiermee kunnen middelen worden vrijgemaakt. Omdat niet alle bronnen al deze functies nodig hebben, laten we met de statische methoden van SyncOnSubscribe exemplaren zonder maken.

Helaas gooien veel methodeaanroepen in de JVM en andere bibliotheken gecontroleerde uitzonderingen en moeten ze worden ingepakt in try-catch catches, omdat de functionele interfaces die door deze klasse worden gebruikt, geen gecontroleerde uitzonderingen toestaan.

Natuurlijk kunnen we andere typische bronnen, zoals een onbeperkt bereik, imiteren:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

In deze opstelling begint de current met 0 en de volgende keer dat de lambda wordt opgeroepen, houdt de parameter current nu 1 .

Er is een variant van SyncOnSubscribe genaamd AsyncOnSubscribe die er AsyncOnSubscribe hetzelfde uitziet, met de uitzondering dat de middelste callback ook een lange waarde heeft die het verzoekbedrag van downstream vertegenwoordigt en dat de callback een Observable met exact dezelfde lengte moet genereren. Deze bron voegt dan al deze Observable in een enkele reeks.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Er is een voortdurende (verhitte) discussie over het nut van deze klasse en wordt over het algemeen niet aanbevolen omdat het routinematig verwachtingen breekt over hoe het daadwerkelijk die gegenereerde waarden zal uitzenden en hoe het zal reageren, of zelfs wat voor soort verzoekwaarden het zal ontvangen in meer complexe consumentenscenario's.

maken (emitter)

Soms is de bron die in een Observable moet worden ingepakt al hot (zoals muisbewegingen) of koud, maar niet onderdrukbaar in de API (zoals een asynchrone netwerk-callback).

Om dergelijke gevallen te behandelen, introduceerde een recente versie van RxJava de methode create(emitter) . Er zijn twee parameters voor nodig:

  • een callback die wordt opgeroepen met een exemplaar van de Emitter<T> -interface voor elke inkomende abonnee,
  • een Emitter.BackpressureMode opsomming die de ontwikkelaar Emitter.BackpressureMode het te gebruiken tegendrukgedrag te specificeren. Het heeft de gebruikelijke modi, vergelijkbaar met onBackpressureXXX , naast het signaleren van een MissingBackpressureException of eenvoudigweg het negeren van een dergelijke overflow erin.

Merk op dat het momenteel geen aanvullende parameters ondersteunt voor die tegendrukmodi. Als men die op maat nodig heeft, met behulp van NONE als de tegendruk-modus en toepassing van de relevante onBackpressureXXX op de resulterende Observable is de weg te gaan.

Het eerste typische geval voor het gebruik ervan wanneer men wil communiceren met een op push gebaseerde bron, zoals GUI-evenementen. Die API's bevatten een vorm van addListener / removeListener aanroepen die men kan gebruiken:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

De Emitter is relatief eenvoudig te gebruiken; men kan bellen onNext , onError en onCompleted op het en de exploitant handvatten tegendruk en afmelding beheer op zijn eigen. Als de ingepakte API bovendien annulering ondersteunt (zoals het verwijderen van de luisteraar in het voorbeeld), kan men de setCancellation (of setSubscription voor setSubscription op Subscription setSubscription ) gebruiken om een annulering-callback te registreren die wordt opgeroepen wanneer de downstream zich afmeldt of de onError / onCompleted wordt aangeroepen op de opgegeven Emitter instantie.

Met deze methoden kan slechts een enkele bron tegelijk aan de zender worden gekoppeld en wordt bij het instellen van een nieuwe bron de oude automatisch uitgeschreven. Als u meerdere bronnen moet verwerken, maakt u een CompositeSubscription , koppelt u deze aan de zender en voegt u vervolgens verdere bronnen toe aan de CompositeSubscription zelf:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Het tweede scenario omvat meestal een asynchrone, op callback gebaseerde API die moet worden omgezet in een Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

In dit geval werkt de delegatie op dezelfde manier. Helaas ondersteunen deze klassieke API's in callback-stijl meestal geen annulering, maar als ze dat wel doen, kan men hun annulering instellen net als in de vorige voorbeelden (met misschien een meer betrokken manier). Let op het gebruik van de LATEST tegendrukmodus; als we weten dat er slechts een enkele waarde zal zijn, hebben we de BUFFER strategie niet nodig omdat deze een standaard 128-elementen lange buffer toewijst (die groeit indien nodig) die nooit volledig zal worden gebruikt.



Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow