Sök…


Introduktion

Mottryck är att i en Observable behandlingspipeline kan vissa asynkronsteg inte bearbeta värdena tillräckligt snabbt och behöver ett sätt att säga uppströmsproducenten att sakta ner.

Det klassiska fallet med behovet av mottryck är när producenten är en het källa:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

I detta exempel kommer huvudtråden att producera 1 miljon objekt till en slutkonsument som bearbetar den på en bakgrundstråd. Det är troligt att den compute(int) -metoden tar lite tid men omkostnaderna för den Observable operatörskedjan kan också lägga till den tid det tar att bearbeta objekt. Den producerande tråden med for-loopen kan dock inte veta detta och fortsätter onNext ing.

Internt har asynkrona operatörer buffertar för att hålla sådana element tills de kan bearbetas. I det klassiska Rx.NET och tidiga RxJava var dessa buffertar obegränsade, vilket innebär att de troligen skulle innehålla nästan alla 1 miljon element från exemplet. Problemet börjar när det till exempel finns 1 miljard miljarder eller samma 1 miljon sekvens visas 1000 gånger i ett program, vilket leder till OutOfMemoryError och generellt avtar på grund av överdriven GC-overhead.

I likhet med hur felhantering blev en förstklassig medborgare och fick operatörer att hantera det (via onErrorXXX operatörer), är mottryck en annan egenskap hos dataflöden som programmeraren måste tänka på och hantera (via onBackpressureXXX operatörer).

Utöver PublishSubject ovan finns det andra operatörer som inte stöder mottryck, främst på grund av funktionella skäl. Exempelvis operatören interval emitterar värden periodiskt, backpressuring det skulle leda till att skifta i den period i förhållande till en väggklocka.

I moderna RxJava har de flesta asynkrona operatörer nu en avgränsad intern buffert, som observeOn ovan, och varje försök att överföra denna buffert kommer att avsluta hela sekvensen med MissingBackpressureException . Dokumentationen för varje operatör har en beskrivning av sitt mottryckbeteende.

Men mottrycket finns mer subtilt i vanliga kalla sekvenser (som inte och borde inte ge MissingBackpressureException ). Om det första exemplet skrivs om:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Det finns inget fel och allt går smidigt med liten minnesanvändning. Anledningen till detta är att många källoperatörer kan "generera" värden på begäran och således kan operatören observeOnrange generera högst så många värden observeOn bufferten kan hålla på en gång utan överflöde.

Denna förhandling är baserad på datavetenskapskonceptet för samrutiner (jag ringer dig, du ringer mig). Operatören range sänder en återuppringning, i form av en implementering av Producer gränssnitt, till observeOn genom att anropa dess (inre Subscriber s) setProducer . I gengäld observeOn Producer.request(n) med ett värde för att berätta det range det är tillåtet att producera (dvs. onNext det) att många ytterligare element. Det är sedan observeOn ansvar att ringa request i rätt tid och med rätt värde för att hålla data flyter men inte överflöda.

Att uttrycka mottryck hos slutkunder är sällan nödvändigt (eftersom de är synkrona med avseende på deras omedelbara uppströms och mottryck sker naturligt på grund av blockering av samtalstack), men det kan vara lättare att förstå hur det fungerar:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Här onStart implementeringen range att producera sitt första värde, som sedan onNext emot i onNext . När compute(int) är klar begärs det andra värdet från range . I en naiv implementering av range skulle ett sådant samtal rekursivt ringa på onNext , vilket leder till StackOverflowError vilket naturligtvis är oönskat.

För att förhindra detta använder operatörer så kallad trampolineringslogik som förhindrar sådana reentrantsamtal. I range kommer det att komma ihåg att det fanns ett request(1) samtal medan det ringde onNext() och när onNext() kommer tillbaka kommer det att göra en ny omgång och ringa onNext() med nästa heltalvärde. Därför, om de två byts, fungerar exemplet fortfarande på samma sätt:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Detta är dock inte sant för onStart . Även om den Observable infrastrukturen garanterar att den kommer att ringas upp högst en gång på varje Subscriber kan samtalet att request(1) utlösa utsläpp av ett element direkt. Om en har initialiseringslogik efter samtalet request(1) som behövs av onNext , kan du sluta med undantag:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

I detta synkrona fall NullPointerException en NullPointerException omedelbart medan du fortfarande kör onStart . Ett mer subtilt fel inträffar om samtalet att request(1) utlöser ett asynkront samtal till onNext på någon annan tråd och läser name i onNext raser som skriver det i onStart post- request .

Därför bör man göra alla onStart i onStart eller till och med innan och samtalsbegäran request() senast. Implementeringar av request() hos operatörer säkerställer korrekt händelse innan relation (eller i andra termer, minne frisläppande eller fullt staket) vid behov.

OnBackpressureXXX-operatörerna

De flesta utvecklare stöter på mottryck när deras applikation misslyckas med MissingBackpressureException och undantaget pekar vanligtvis på observeOn . Den faktiska orsaken är vanligtvis den icke-trycktryckta användningen av PublishSubject , timer() eller interval() eller anpassade operatörer som skapats via create() .

Det finns flera sätt att hantera sådana situationer.

Öka buffertstorlekarna

Ibland inträffar sådana överflöden på grund av skitliga källor. Plötsligt observeOn användaren på skärmen för snabbt och observeOn standard 16-elementets interna buffert på Android-överflöd.

De flesta mottryckskänsliga operatörer i de senaste versionerna av RxJava tillåter nu programmerare att ange storleken på sina interna buffertar. De relevanta parametrarna kallas vanligtvis bufferSize , prefetch eller capacityHint . Med tanke på det överfyllda exemplet i introduktionen kan vi bara öka observeOn buffertstorlek för att ha tillräckligt med utrymme för alla värden.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Observera dock att generellt sett kan detta endast vara en tillfällig fix eftersom översvämningen fortfarande kan hända om källan överproducerar den förutsagda buffertstorleken. I detta fall kan man använda en av följande operatörer.

Sats / hoppa över värden med standardoperatörer

Om källdata kan behandlas mer effektivt i batch kan man minska sannolikheten för MissingBackpressureException genom att använda en av de vanliga batchningsoperatörerna (efter storlek och / eller med tiden).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Om vissa av värdena kan ignoreras på ett säkert sätt kan man använda provtagningen (med tiden eller någon annan observerbar) och throttleFirst ( throttleFirst , throttleLast , throttleWithTimeout ).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Observera hovewer att dessa operatörer bara minskar frekvensen för värdemottagning av nedströms och därför kan de fortfarande leda till MissingBackpressureException .

onBackpressureBuffer ()

Denna operatör i sin parameterlösa form återinför en obegränsad buffert mellan uppströmskällan och nedströmsoperatören. Att vara obegränsat innebär så länge JVM inte slutar i minnet, det kan hantera nästan alla belopp som kommer från en skördig källa.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

I detta exempel går observeOn med en mycket låg buffertstorlek men det finns ingen MissingBackpressureException eftersom onBackpressureBuffer suger upp alla 1 miljon värden och överlämnar små partier av det för att observeOn .

Observera dock att onBackpressureBuffer konsumerar sin källa på obegränsat sätt, det vill säga utan att använda något mottryck på det. Detta har konsekvensen att till och med en baktryckstödjande källa som range kommer att realiseras fullständigt.

Det finns ytterligare 4 överbelastningar av onBackpressureBuffer

onBackpressureBuffer (int-kapacitet)

Detta är en begränsad version som signalerar BufferOverflowError i fall dess buffert når den givna kapaciteten.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Denna operatørs relevans minskar när fler och fler operatörer nu tillåter att ställa in sina buffertstorlekar. För resten ger detta en möjlighet att "utöka sin interna buffert" genom att ha ett större antal med onBackpressureBuffer än deras standard.

onBackpressureBuffer (int-kapacitet, Action0 onOverflow)

Denna överbelastning kallar en (delad) åtgärd i händelse av att ett överflöde inträffar. Dess användbarhet är ganska begränsad eftersom det inte finns någon annan information om överflödet än den nuvarande samtalstacken.

onBackpressureBuffer (int-kapacitet, Action0 onOverflow, BackpressureOverflow.Strategy strategi)

Denna överbelastning är faktiskt mer användbar eftersom det låter oss definiera vad man ska göra om kapaciteten har uppnåtts. BackpressureOverflow.Strategy är faktiskt ett gränssnitt men klassen BackpressureOverflow erbjuder fyra statiska fält med implementeringar av det som representerar typiska åtgärder:

  • ON_OVERFLOW_ERROR : detta är standardbeteendet för de två tidigare överbelastningarna, vilket signalerar en BufferOverflowException
  • ON_OVERFLOW_DEFAULT : för närvarande är det samma som ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : Om ett överflöde skulle inträffa ignoreras det aktuella värdet helt enkelt och endast de gamla värdena kommer att levereras när nedströmmen begär det.
  • ON_OVERFLOW_DROP_OLDEST : tappar det äldsta elementet i bufferten och lägger till det aktuella värdet till det.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Observera att de två sista strategierna orsakar diskontinuitet i strömmen när de tappar bort element. Dessutom kommer de inte att signalera BufferOverflowException .

onBackpressureDrop ()

Närhelst nedströms inte är redo att ta emot värden, kommer denna operatör att släppa det valet från sekvensen. Man kan tänka på det som en 0-kapacitet onBackpressureBuffer med strategi ON_OVERFLOW_DROP_LATEST .

Den här operatören är användbar när man säkert kan ignorera värden från en källa (t.ex. musrörelser eller aktuella GPS-positionssignaler) eftersom det kommer att finnas fler aktuella värden senare.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Det kan vara användbart i samband med källoperatörens interval() . Om man till exempel vill utföra någon periodisk bakgrundsuppgift men varje iteration kan pågå längre än perioden är det säkert att tappa överskottsintervallmeddelandet eftersom det kommer att finnas mer senare:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Det finns en överbelastning av den här operatören: onBackpressureDrop(Action1<? super T> onDrop) där åtgärden (delad) kallas med värdet som tappas. Denna variant gör det möjligt att rensa upp själva värdena (t.ex. släppa tillhörande resurser).

onBackpressureLatest ()

Den slutliga operatören behåller bara det senaste värdet och överskriver praktiskt taget äldre, ej levererade värden. Man kan tänka på detta som en variant av onBackpressureBuffer med en kapacitet på 1 och strategi för ON_OVERFLOW_DROP_OLDEST .

Till skillnad från onBackpressureDrop finns det alltid ett värde tillgängligt för konsumtion om nedströmsen höll sig efter. Detta kan vara användbart i vissa telemetriliknande situationer där uppgifterna kan komma i något burstigt mönster men bara det allra senaste är intressant för bearbetning.

Om användaren till exempel klickar mycket på skärmen vill vi fortfarande reagera på dess senaste inmatning.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

Användningen av onBackpressureDrop i det här fallet skulle leda till en situation där det sista klicket tappas och lämnar användaren undrar varför affärslogiken inte genomfördes.

Skapa mottryckta datakällor

Att skapa mottryckta datakällor är den relativt lättare uppgiften när man hanterar mottryck i allmänhet eftersom biblioteket redan erbjuder statiska metoder på Observable som hanterar mottryck för utvecklaren. Vi kan skilja två typer av fabriksmetoder: kalla "generatorer" som antingen returnerar och genererar element baserade på nedströmsefterfrågan och heta "pushers" som vanligtvis överbryggar icke-reaktiva och / eller icke-mottryckbara datakällor och lägger en viss tryckhantering ovanpå dem.

bara

Den mest grundläggande källan mot mottryck skapas via just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Eftersom vi uttryckligen inte begär in onStart kommer detta inte att skriva ut något. just är bra när det finns ett konstant värde som vi vill hoppa av en sekvens.

Tyvärr just är ofta misstas för ett sätt att beräkna något dynamiskt att konsumeras av Subscriber s:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Förvånande för vissa skriver detta ut två gånger istället för att skriva ut respektive 1. Om samtalet skrivs om blir det uppenbart varför det fungerar så:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue kallas som en del av huvudrutinen och inte som svar på att abonnenterna prenumererar.

fromCallable

Vad människor faktiskt behöver är metoden från fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Här computeValue endast när en abonnent prenumererar och för var och en av dem, trycker de förväntade 1 och 2. Naturligtvis fromCallable också ordentligt mottryck och avger inte det beräknade värdet såvida det inte begärs. Observera dock att beräkningen sker ändå. Om beräkningen i sig ska försenas tills nedströmmen faktiskt begär, kan vi använda just med map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

kommer just inte att avge sitt konstant värde förrän det begärs när det mappas till resultatet av computeValue , som fortfarande computeValue för varje abonnent individuellt.

från

Om uppgifterna redan är tillgängliga som en grupp av objekt, en lista med objekt eller någon Iterable källa, kommer de respektive from överbelastningar att hantera mottrycket och emissionen från sådana källor:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

För bekvämlighet (och undvika varningar om generisk array skapande) det finns två till 10 argument överbelastningar till just som internt delegera till from .

from(Iterable) ger också en intressant möjlighet. Många värdeskapande kan uttryckas i form av en tillståndsmaskin. Varje begärt element utlöser en tillståndsövergång och beräkning av det returnerade värdet.

Att skriva sådana tillståndsmaskiner som Iterable s är ganska komplicerat (men ändå enklare än att skriva en Observable för att konsumera den) och till skillnad från C # har Java inget stöd från kompilatorn för att bygga sådana tillståndsmaskiner genom att helt enkelt skriva klassiskt ser kod (med yield return och yield break ). Vissa bibliotek erbjuder lite hjälp, till exempel Google Guava's AbstractIterable och IxJava's Ix.generate() och Ix.forloop() . Dessa är i sig själva värda en hel serie så låt oss se någon väldigt grundläggande Iterable källa som upprepar ett konstant värde på obestämd tid:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Om vi konsumerar iterator via klassisk för-loop, skulle det resultera i en oändlig slinga. Eftersom vi bygger ett Observable utifrån det, kan vi uttrycka vår vilja att bara konsumera de första 5 och sedan sluta att begära något. Detta är den verkliga kraften i att lata utvärdera och beräkna inuti Observable s.

skapa (SyncOnSubscribe)

Ibland är datakällan som ska konverteras till den reaktiva världen själv synkron (blockerande) och pull-liknande, det vill säga, vi måste ringa någon get eller read metod för att få nästa databitar. Man kan naturligtvis förvandla det till en Iterable men när sådana källor är associerade med resurser, kan vi läcka dessa resurser om nedströmsen tecknar sekvensen innan den skulle ta slut.

För att hantera sådana fall har SyncOnSubscribe klassen SyncOnSubscribe . Man kan utöka det och implementera sina metoder eller använda en av sina lambdabaserade fabriksmetoder för att bygga en instans.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

Generellt använder SyncOnSubscribe 3 återuppringningar.

De första återuppringningarna tillåter en att skapa ett tillstånd per abonnent, till exempel FileInputStream i exemplet; filen öppnas oberoende för varje enskild abonnent.

Den andra återuppringning tar detta tillstånd objekt och ger en utgångs Observer vars onXXX metoder kan anropas för att sända ut värdena. Denna återuppringning utförs så många gånger som nedströms begärde. Vid varje kallelse måste den ringa på onNext högst en gång valfritt följt av antingen onError eller onCompleted . I exemplet kallar vi onCompleted() om läsbyte är negativt, indikerar och slutar på fil, och ringer onError i fall läsning kastar en IOException .

Den sista återuppringningen aktiveras när nedströmmen avslutar abonnemanget (stänger ingångsströmmen) eller när den tidigare återuppringningen anropade terminalmetoderna; det gör det möjligt att frigöra resurser. Eftersom inte alla källor behöver alla dessa funktioner, gör de statiska metoderna i SyncOnSubscribe låt en skapa instanser utan dem.

Tyvärr, många metodsamtal över JVM och andra bibliotek kastar kontrollerade undantag och måste förpackas i try-catch es eftersom de funktionella gränssnitten som används av denna klass inte tillåter kastade kontrollerade undantag.

Naturligtvis kan vi imitera andra typiska källor, till exempel ett obegränsat intervall med det:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

Med den här inställningen den current börjar med 0 och nästa gång lambda anropas parametern current håller nu 1 .

Det finns en variant av SyncOnSubscribe heter AsyncOnSubscribe som ser ganska lika ut, med undantag för att den mellersta återuppringningen också tar ett långt värde som representerar begäran från nedströms och återuppringningen bör generera en Observable med exakt samma längd. Denna källa sammanfogar sedan alla dessa Observable i en enda sekvens.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Det pågår en pågående (upphettad) diskussion om användbarheten för denna klass och generellt inte rekommenderas eftersom den rutinmässigt bryter förväntningarna om hur den faktiskt kommer att avge de genererade värden och hur den kommer att svara på, eller till och med vilken typ av förfrågningsvärden den kommer att få i mer komplexa konsument scenarier.

skapa (emitter)

Ibland är källan som ska lindas in i en Observable redan varm (som musrörelser) eller kall men inte återtryckbar i sin API (t.ex. ett asynkron återuppringning av nätverket).

För att hantera sådana fall införde en ny version av RxJava fabriksmetoden create(emitter) . Det tar två parametrar:

  • en återuppringning som kommer att ringas med en instans av gränssnittet Emitter<T> för varje inkommande abonnent,
  • en uppräkning av Emitter.BackpressureMode som Emitter.BackpressureMode att utvecklaren anger det baktrycksbeteende som ska tillämpas. Det har de vanliga lägena, liknande onBackpressureXXX förutom att signalera en MissingBackpressureException eller helt enkelt ignorera sådant överflöde inuti det helt.

Observera att det för närvarande inte stöder ytterligare parametrar för dessa mottryckslägen. Om man behöver den anpassningen, använder man NONE som mottrycksläge och tillämpar relevant onBackpressureXXX på den resulterande Observable är vägen att gå.

Det första typiska fallet för dess användning när man vill interagera med en push-baserad källa, till exempel GUI-händelser. Dessa API: er har någon form av addListener / removeListener samtal som man kan använda:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter är relativt enkel att använda; man kan ringa på onNext , onError och onCompleted på det och operatören hanterar mottryck och avbokningshantering på egen hand. Om det inslagna API stöder avbokning (till exempel avlägsnandet av lyssnaren i exemplet) kan man dessutom använda setCancellation (eller setSubscription för Subscription resurser) för att registrera ett annulleringskall som återkallas när nedströmsen onError eller onError / onCompleted kallas på den Emitter instansen.

Dessa metoder gör att endast en enda resurs kan kopplas till sändaren åt gången och om du ställer in en ny avregistrerar den gamla automatiskt. Om man måste hantera flera resurser skapar du en CompositeSubscription , kopplar den till emittern och lägger sedan till ytterligare resurser till CompositeSubscription själv:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Det andra scenariot involverar vanligtvis ett asynkront, återuppringningsbaserat API som måste konverteras till ett Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

I detta fall fungerar delegationen på samma sätt. Tyvärr stöder dessa klassiska återuppringningsstyler vanligtvis inte avbokning, men om de gör det, kan man ställa in deras avbokning precis som i föregående exempel (med kanske ett mer involverat sätt men). Observera användningen av det LATEST mottrycksläget; om vi vet att det bara kommer att finnas ett enda värde behöver vi inte BUFFER strategin eftersom den tilldelar en standardbuffert med 128 element (som växer vid behov) som aldrig kommer att utnyttjas fullt ut.



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow