Ricerca…


introduzione

La contropressione avviene quando in una pipeline di elaborazione Observable alcuni stadi asincroni non sono in grado di elaborare i valori abbastanza velocemente e hanno bisogno di un modo per dire al produttore a monte di rallentare.

Il caso classico della necessità di contropressione è quando il produttore è una fonte di calore:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

In questo esempio, il thread principale produrrà 1 milione di elementi per un consumatore finale che lo sta elaborando su un thread in background. È probabile che il metodo di compute(int) richieda del tempo, ma il sovraccarico della catena di operatori Observable può anche aumentare il tempo necessario per elaborare gli articoli. Tuttavia, il thread di produzione con il ciclo for non può sapere ciò e continua a onNext .

Internamente, gli operatori asincroni dispongono di buffer per conservare tali elementi fino a quando non possono essere elaborati. Nel classico Rx.NET e all'inizio RxJava, questi buffer erano illimitati, il che significa che probabilmente conserverebbero quasi 1 milione di elementi dall'esempio. Il problema inizia quando ci sono, per esempio, 1 miliardo di elementi o la stessa sequenza di 1 milione appare 1000 volte in un programma, portando a OutOfMemoryError e generalmente rallentamenti a causa dell'overhead eccessivo del GC.

Analogamente a come la gestione degli errori è diventata un cittadino di prima classe e ha ricevuto operatori per affrontarla (tramite operatori onErrorXXX ), la backpressure è un'altra proprietà dei flussi di dati che il programmatore deve pensare e gestire (tramite onBackpressureXXX operatori di onBackpressureXXX ).

Oltre a PublishSubject sopra, ci sono altri operatori che non supportano la contropressione, principalmente per ragioni funzionali. Ad esempio, l' interval operatore emette valori periodicamente, la contropressione porterebbe a spostarsi nel periodo relativo a un orologio da parete.

Nel moderno RxJava, la maggior parte degli operatori asincroni ora ha un buffer interno limitato, come observeOn sopra e qualsiasi tentativo di overflow di questo buffer terminerà l'intera sequenza con MissingBackpressureException . La documentazione di ciascun operatore ha una descrizione del suo comportamento di contropressione.

Tuttavia, la contropressione è presente in modo più sottile nelle sequenze fredde regolari (che non producono e non dovrebbero produrre MissingBackpressureException ). Se il primo esempio è stato riscritto:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Non ci sono errori e tutto funziona senza problemi con l'utilizzo di memoria di piccole dimensioni. La ragione di ciò è che molti operatori sorgente possono "generare" valori su richiesta e quindi l'operatore observeOn può dire che l' range genera al massimo così tanti valori che il buffer observeOn può contenere contemporaneamente senza overflow.

Questa trattativa si basa sul concetto di informatica delle co-routine (io ti chiamo, tu mi chiami). L'operatore range invia un callback, sotto forma di un'implementazione del Producer dell'interfaccia, alla observeOn chiamando il suo (interna Subscriber s') setProducer . In cambio, observeOn chiama Producer.request(n) con un valore per indicare range che è autorizzato a produrre (cioè, su onNext ) molti altri elementi. È quindi responsabilità observeOn chiamare il metodo di request nel momento giusto e con il giusto valore per mantenere i dati fluidi ma non straripanti.

Raramente la necessità di esprimere una contropressione nei consumatori finali (perché sono sincroni rispetto al loro upstream immediato e alla contropressione avviene naturalmente a causa del blocco delle chiamate), ma potrebbe essere più facile capirne il funzionamento:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Qui l'implementazione di onStart indica l' range per produrre il suo primo valore, che viene quindi ricevuto in onNext . Una volta terminato il compute(int) , viene richiesto un altro valore range . In un'implementazione ingenua range , tale chiamata chiamerebbe ricorsivamente onNext , portando a StackOverflowError che ovviamente non è desiderabile.

Per evitare ciò, gli operatori utilizzano la cosiddetta logica del trampolino che impedisce tali chiamate di rientro. Nei termini range , ricorderà che c'era una chiamata di request(1) mentre chiamava onNext() e una volta onNext() restituisce, farà un altro giro e chiamerà onNext() con il valore intero successivo. Pertanto, se i due vengono scambiati, l'esempio funziona ancora allo stesso modo:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Tuttavia, questo non è vero per onStart . Sebbene l'infrastruttura Observable garantisca che venga chiamata al più una volta su ciascun Subscriber , la chiamata alla request(1) può far scattare immediatamente l'emissione di un elemento. Se si ha una logica di inizializzazione dopo la chiamata alla request(1) che è necessaria per onNext , si può finire con eccezioni:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

In questo caso sincrono, una NullPointerException verrà lanciata immediatamente mentre sta ancora eseguendo onStart . Un bug più sottile si verifica se la chiamata alla request(1) innesca una chiamata asincrona a onNext su qualche altro thread e il name lettura in onNext gare lo scrive in onStart request post- onStart .

Pertanto, si dovrebbe eseguire l'inizializzazione di tutti i campi in onStart o anche prima e chiamare la request() ultimo. Le implementazioni di request() negli operatori assicurano la corretta relazione happening-before (o in altri termini, rilascio di memoria o recinzione completa) quando necessario.

Gli operatori di onBackpressureXXX

La maggior parte degli sviluppatori incontra la contropressione quando la loro applicazione fallisce con MissingBackpressureException e l'eccezione di solito punta all'operatore observeOn . La causa effettiva è solitamente l'uso non PublishSubject di PublishSubject , timer() o interval() o operatori personalizzati creati tramite create() .

Esistono diversi modi per gestire tali situazioni.

Aumentando le dimensioni del buffer

A volte tali traboccamenti si verificano a causa di fonti esplosive. All'improvviso, l'utente tocca lo schermo troppo velocemente e observeOn buffer interno di 16 elementi predefinito su overflow Android.

La maggior parte degli operatori sensibili alla retropressione nelle versioni recenti di RxJava ora consente ai programmatori di specificare la dimensione dei loro buffer interni. I parametri rilevanti sono solitamente chiamati bufferSize , prefetch o capacityHint . Dato l'esempio traboccante nell'introduzione, possiamo semplicemente aumentare la dimensione del buffer di observeOn per avere spazio sufficiente per tutti i valori.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Si noti tuttavia che, in generale, questa può essere solo una correzione temporanea poiché l'overflow può ancora verificarsi se l'origine del prodotto supera la dimensione del buffer prevista. In questo caso, è possibile utilizzare uno dei seguenti operatori.

Valori batch / saltati con operatori standard

Nel caso in cui i dati di origine possano essere elaborati in modo più efficiente in batch, è possibile ridurre la probabilità di MissingBackpressureException utilizzando uno degli operatori di batch standard (per dimensione e / o tempo).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Se alcuni dei valori possono essere tranquillamente ignorati, è possibile utilizzare il campionamento (con il tempo o un altro Osservabile) e gli operatori di limitazione ( throttleFirst , throttleLast , throttleWithTimeout ).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Si noti che hovewer questi operatori riducono solo il tasso di ricezione del valore da parte del downstream e quindi possono ancora portare a MissingBackpressureException .

onBackpressureBuffer ()

Questo operatore nella sua forma senza parametri reintroduce un buffer illimitato tra l'origine upstream e l'operatore downstream. Essere illimitati significa che finché la JVM non esaurisce la memoria, può gestire quasi tutte le quantità provenienti da una fonte esplosiva.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

In questo esempio, observeOn una dimensione del buffer molto bassa, ma non c'è MissingBackpressureException poiché onBackpressureBuffer assorbe tutti i 1 milione di valori e passa a piccoli lotti di esso per observeOn .

Si noti tuttavia che onBackpressureBuffer consuma la propria origine in modo illimitato, cioè senza applicare alcuna contropressione ad esso. Ciò ha come conseguenza che anche una sorgente che supporta la contropressione come l' range sarà completamente realizzata.

Ci sono 4 sovraccarichi aggiuntivi di onBackpressureBuffer

onBackpressureBuffer (capacità int)

Questa è una versione limitata che segnala BufferOverflowError nel caso in cui il buffer raggiunga la capacità specificata.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

La rilevanza di questo operatore è in diminuzione poiché sempre più operatori consentono ora di impostare le dimensioni del buffer. Per il resto, questo dà l'opportunità di "estendere il loro buffer interno" avendo un numero maggiore con onBackpressureBuffer rispetto al loro valore predefinito.

onBackpressureBuffer (int capacity, Action0 onOverflow)

Questo overload richiama un'azione (condivisa) nel caso in cui si verifichi un overflow. La sua utilità è piuttosto limitata in quanto non vi sono altre informazioni fornite sull'overflow rispetto allo stack di chiamate corrente.

onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflow.Strategy strategy)

Questo sovraccarico è in realtà più utile in quanto consente di definire cosa fare nel caso in cui la capacità sia stata raggiunta. BackpressureOverflow.Strategy è in realtà un'interfaccia ma la classe BackpressureOverflow offre 4 campi statici con implementazioni di esso che rappresentano azioni tipiche:

  • ON_OVERFLOW_ERROR : questo è il comportamento predefinito dei due overload precedenti, segnalando BufferOverflowException
  • ON_OVERFLOW_DEFAULT : attualmente è uguale a ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : se dovesse verificarsi un overflow, il valore corrente verrà semplicemente ignorato e solo i vecchi valori verranno consegnati una volta che le richieste downstream.
  • ON_OVERFLOW_DROP_OLDEST : rilascia l'elemento più vecchio nel buffer e aggiunge il valore corrente ad esso.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Si noti che le ultime due strategie causano discontinuità nel flusso mentre eliminano gli elementi. Inoltre, non segnalano BufferOverflowException .

onBackpressureDrop ()

Ogni volta che il downstream non è pronto a ricevere i valori, questo operatore lo farà cadere dalla sequenza. Si può pensare a una capacità 0 onBackpressureBuffer con la strategia ON_OVERFLOW_DROP_LATEST .

Questo operatore è utile quando si possono tranquillamente ignorare i valori da una sorgente (come mosse del mouse o segnali di posizione GPS correnti) poiché ci saranno più valori aggiornati in seguito.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Può essere utile in congiunzione con l' interval() operatore sorgente interval() . Ad esempio, se si desidera eseguire un'attività di background periodica ma ciascuna iterazione può durare più a lungo del periodo, è possibile rilasciare la notifica di intervallo in eccesso in quanto ci sarà più tardi su:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Esiste un overload di questo operatore: onBackpressureDrop(Action1<? super T> onDrop) dove viene chiamata l'azione (condivisa) con il valore che viene eliminato. Questa variante consente di ripulire i valori stessi (ad esempio, rilasciando risorse associate).

onBackpressureLatest ()

L'operatore finale mantiene solo l'ultimo valore e praticamente sovrascrive i valori meno recenti e non consegnati. Si può pensare a questa come una variante di onBackpressureBuffer con una capacità di 1 e una strategia di ON_OVERFLOW_DROP_OLDEST .

A differenza di onBackpressureDrop c'è sempre un valore disponibile per il consumo se il downstream è in ritardo. Questo può essere utile in alcune situazioni simili alla telemetria in cui i dati possono presentarsi in qualche schema di raffica, ma solo l'ultima è interessante per l'elaborazione.

Ad esempio, se l'utente fa clic molto sullo schermo, vorremmo comunque rispondere al suo ultimo input.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

L'uso di onBackpressureDrop in questo caso porterebbe a una situazione in cui l'ultimo clic viene rilasciato e lascia all'utente chiedersi perché la logica di business non è stata eseguita.

Creazione di origini dati retropresse

La creazione di origini dati backpressured è il compito relativamente più semplice quando si ha a che fare con la backpressure in generale, poiché la libreria offre già metodi statici su Observable che gestiscono la contropressione per lo sviluppatore. Possiamo distinguere due tipi di metodi di fabbrica: "generatori" freddi che restituiscono e generano elementi basati sulla domanda a valle e "spacciatori" a caldo che di solito collegano fonti di dati non reattivi e / o non contropressurabili e sovrappongono la gestione della contropressione in cima a loro.

appena

La sorgente di backpressure più elementare è creata just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Dal momento che non richiediamo esplicitamente in onStart , questo non stampa nulla. è just fantastico quando c'è un valore costante in cui vorremmo avviare una sequenza.

Purtroppo, just viene spesso scambiato per un modo per calcolare dinamicamente qualcosa per essere consumato da Subscriber s:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Sorprendente per alcuni, questa stampa 1 due volte invece di stampare 1 e 2 rispettivamente. Se la chiamata viene riscritta, diventa ovvio il motivo per cui funziona così:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

Il computeValue viene chiamato come parte della routine principale e non in risposta agli abbonati che sottoscrivono.

fromCallable

Ciò di cui le persone hanno effettivamente bisogno è il metodo fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Qui il computeValue viene eseguito solo quando un sottoscrittore sottoscrive e per ciascuno di essi, stampando l'1 e il 2 previsti. Naturalmente, anche fromCallable supporta correttamente la contropressione e non emetterà il valore calcolato se non richiesto. Si noti comunque che il calcolo avviene comunque. Nel caso in cui il calcolo stesso debba essere ritardato fino a quando il downstream richiede effettivamente, possiamo usare just con la map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just non emetterà il suo valore costante fino a quando richiesto quando è mappato al risultato del valore computeValue , ancora chiamato singolarmente per ogni utente.

a partire dal

Se i dati sono già disponibili come una matrice di oggetti, un elenco di oggetti o qualsiasi sorgente Iterable , i rispettivi from sovraccarichi gestiranno la contropressione e l'emissione di tali fonti:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Per comodità (ed evitando gli avvertimenti sulla creazione di array generici) ci sono da 2 a 10 sovraccarichi di argomenti just per delegare internamente a from .

Il from(Iterable) offre anche un'opportunità interessante. Molte generazioni di valore possono essere espresse in una forma di macchina di stato. Ogni elemento richiesto attiva una transizione di stato e il calcolo del valore restituito.

La scrittura di macchine di stato come Iterable s è alquanto complicata (ma ancora più semplice della scrittura di un Observable per consumarla) ea differenza del C #, Java non ha alcun supporto dal compilatore per costruire tali macchine di stato semplicemente scrivendo codice di aspetto classico (con yield return e yield break ). Alcune librerie offrono qualche aiuto, come AbstractIterable Google Guava e Ix.generate() e Ix.forloop() . Questi sono di per sé degni di una serie completa, quindi vediamo una sorgente Iterable molto semplice che ripete indefinitamente un valore costante:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Se consumassimo l' iterator tramite il classico ciclo for, ciò comporterebbe un ciclo infinito. Dal momento che costruiamo un Observable fuori da esso, possiamo esprimere la nostra volontà di consumare solo i primi 5 di esso e quindi smettere di richiedere qualsiasi cosa. Questo è il vero potere di valutare e calcolare pigramente dentro s Observable .

creare (SyncOnSubscribe)

A volte, l'origine dei dati da convertire nel mondo reattivo stesso è sincrona (bloccante) e pull-like, vale a dire, dobbiamo chiamare qualche metodo get o read per ottenere il prossimo pezzo di dati. Si potrebbe, ovviamente, trasformarlo in un Iterable ma quando tali risorse sono associate a risorse, potremmo perdere tali risorse se il downstream annulla la sequenza prima che finisca.

Per gestire tali casi, RxJava ha la classe SyncOnSubscribe . Si può estenderlo e implementare i suoi metodi o usare uno dei suoi metodi di fabbrica basati su lambda per costruire un'istanza.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

In generale, SyncOnSubscribe utilizza 3 callback.

I primi callback consentono di creare uno stato per sottoscrittore, come FileInputStream nell'esempio; il file verrà aperto indipendentemente per ogni singolo utente.

Il secondo callback accetta questo oggetto stato e fornisce un Observer output i cui metodi onXXX possono essere chiamati per emettere valori. Questo callback viene eseguito tante volte quanto richiesto dal downstream. Ad ogni chiamata, deve chiamare onNext Al più al massimo una volta facoltativamente seguito da onError o onCompleted . Nell'esempio chiamiamo onCompleted() se il byte letto è negativo, indicante e fine del file, e chiama onError nel caso in cui il read lanci una IOException .

Il callback finale viene richiamato quando il downstream si annulla (chiudendo l'inputstream) o quando il callback precedente ha chiamato i metodi del terminale; consente di liberare risorse. Dal momento che non tutte le fonti hanno bisogno di tutte queste funzionalità, i metodi statici di SyncOnSubscribe consentono di creare istanze senza di esse.

Sfortunatamente, molte chiamate di metodo attraverso la JVM e altre librerie generano eccezioni controllate e devono essere racchiuse in esecuzioni try-catch quanto le interfacce funzionali utilizzate da questa classe non consentono il lancio di eccezioni controllate.

Naturalmente, possiamo imitare altre fonti tipiche, ad esempio un intervallo illimitato con esso:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

In questa configurazione, la current inizia con 0 e la volta successiva che viene richiamata la lambda, il parametro current ora contiene 1 .

Esiste una variante di SyncOnSubscribe chiamata AsyncOnSubscribe che sembra abbastanza simile con l'eccezione che il callback centrale richiede anche un valore lungo che rappresenta l'importo della richiesta da downstream e la callback dovrebbe generare un Observable con la stessa lunghezza esatta. Questa fonte quindi concatena tutti questi Observable in una singola sequenza.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

C'è una discussione (accesa) in corso sull'utilità di questa classe e generalmente non raccomandata perché rompe abitualmente le aspettative su come effettivamente emetterà quei valori generati e su come risponderà, o anche quale tipo di valori di richiesta riceverà in scenari consumer più complessi.

creare (emettitore)

A volte, la sorgente da includere in un Observable è già calda (come le mosse del mouse) o fredda ma non è backpressurable nella sua API (come un callback di rete asincrono).

Per gestire casi del genere, una versione recente di RxJava ha introdotto il metodo factory create(emitter) . Ci vogliono due parametri:

  • un callback che verrà chiamato con un'istanza dell'interfaccia Emitter<T> per ciascun utente in entrata,
  • un'enumerazione Emitter.BackpressureMode che impone allo sviluppatore di specificare il comportamento di contropressione da applicare. Ha le solite modalità, simili a onBackpressureXXX oltre a segnalare una MissingBackpressureException o semplicemente ignorare del tutto l'overflow al suo interno.

Si noti che al momento non supporta parametri aggiuntivi per tali modalità di contropressione. Se uno ha bisogno di quelli di personalizzazione, utilizzando NONE come modalità di contropressione e applicando le pertinenti onBackpressureXXX sul risultante Observable è la strada da percorrere.

Il primo caso tipico per il suo utilizzo quando si desidera interagire con una fonte basata su push, come gli eventi della GUI. Queste API presentano alcune forme di chiamate addListener / removeListener che è possibile utilizzare:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

L' Emitter è relativamente semplice da usare; si può chiamare su onNext , su onError e su onCompleted su di esso e l'operatore gestisce da solo la gestione della contropressione e della cancellazione. Inoltre, se l'API setCancellation supporta la cancellazione (come la rimozione del listener nell'esempio), è possibile utilizzare setCancellation (o setSubscription per le risorse simili a Subscription ) per registrare un callback di annullamento che viene richiamato quando l'annullamento di sottoscrizione o l' onError / onCompleted viene chiamato onCompleted Emitter fornita.

Questi metodi consentono di associare una sola risorsa all'emettitore alla volta e impostarne una nuova annulla automaticamente la vecchia. Se si devono gestire più risorse, creare una CompositeSubscription , associarla all'emettitore e quindi aggiungere ulteriori risorse alla stessa CompositeSubscription :

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Il secondo scenario di solito prevede alcune API asincrone basate su callback che devono essere convertite in Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

In questo caso, la delega funziona allo stesso modo. Sfortunatamente, di solito queste classiche API di callback non supportano la cancellazione, ma se lo fanno, si può impostare la cancellazione proprio come negli esempi precedenti (con un modo forse più complesso). Si noti l'uso del LATEST modalità contropressione; se sappiamo che ci sarà solo un singolo valore, non abbiamo bisogno della strategia BUFFER poiché assegna un buffer lungo 128 elementi predefinito (che cresce come necessario) che non sarà mai completamente utilizzato.



Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow