Recherche…


introduction

La contre - pression se produit lorsque, dans un pipeline de traitement Observable , certaines étapes asynchrones ne peuvent pas traiter les valeurs assez rapidement et nécessitent un moyen de ralentir le producteur en amont.

Le cas classique du besoin de contre-pression est lorsque le producteur est une source chaude:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

Dans cet exemple, le thread principal produira 1 million d'éléments pour un consommateur final qui le traite sur un thread d'arrière-plan. Il est probable que la méthode compute(int) prenne un certain temps, mais la surcharge de la chaîne d'opérateurs Observable peut également augmenter le temps nécessaire au traitement des éléments. Cependant, le thread produisant avec la boucle for ne peut pas le savoir et continue à onNext .

En interne, les opérateurs asynchrones ont des tampons pour contenir de tels éléments jusqu'à ce qu'ils puissent être traités. Dans le Rx.NET classique et au début du RxJava, ces tampons étaient sans limites, ce qui signifiait qu'ils contiendraient probablement presque tous les 1 million d'éléments de l'exemple. Le problème commence lorsque, par exemple, 1 milliard d'éléments ou 1 million de séquences apparaissent 1000 fois dans un programme, conduisant à OutOfMemoryError et généralement à des ralentissements dus à une surcharge excessive du GC.

Semblable à la façon dont la gestion des erreurs est devenue un citoyen de première classe et a reçu des opérateurs (via les opérateurs onErrorXXX ), la contre-pression est une autre propriété des flux de données à laquelle le programmeur doit penser (via les opérateurs onBackpressureXXX ).

Au-delà du PublishSubject ci-dessus, d'autres opérateurs ne prennent pas en charge la contre-pression, principalement pour des raisons fonctionnelles. Par exemple, l' interval l'opérateur émet des valeurs périodiquement, ce qui entraîne une modification de la période par rapport à une horloge murale.

Dans RxJava moderne, la plupart des opérateurs asynchrones ont maintenant un tampon interne limité, comme observeOn ci-dessus et toute tentative de débordement de ce tampon mettra fin à la séquence entière avec MissingBackpressureException . La documentation de chaque opérateur a une description de son comportement de contre-pression.

Cependant, la contre-pression est présente de manière plus subtile dans les séquences froides régulières (qui ne doivent pas et ne doivent pas générer une MissingBackpressureException ). Si le premier exemple est réécrit:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Il n'y a pas d'erreur et tout fonctionne sans problème avec une faible utilisation de la mémoire. La raison en est que de nombreux opérateurs sources peuvent "générer" des valeurs à la demande et ainsi l'opérateur observeOn peut indiquer que la range génère autant de valeurs que le tampon observeOn peut contenir en une fois sans dépassement.

Cette négociation est basée sur le concept informatique de co-routines (je vous appelle, vous m'appelez). L'opérateur range envoie un rappel, sous la forme d'une mise en œuvre du Producer interface, à l' observeOn en appelant son (intérieur Subscriber de) setProducer . En retour, le observeOn appelle Producer.request(n) avec une valeur pour indiquer la range qu'il est autorisé à produire (c'est-à-dire, onNext it) autant d'éléments supplémentaires . Il est alors de la observeOn de observeOn d'appeler la méthode de request au bon moment et avec la bonne valeur pour que les données continuent à circuler mais pas à saturation.

L'expression de la contre-pression chez les consommateurs finaux est rarement nécessaire (car ils sont synchrones par rapport à leur amont immédiat et la contre-pression se produit naturellement par le blocage de la pile d'appels), mais il peut être plus facile de comprendre son fonctionnement:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Ici, l'implémentation onStart indique la range pour produire sa première valeur, qui est ensuite reçue dans onNext . Une fois que le compute(int) terminé, l'autre valeur est alors demandée à partir de la range . Dans une implémentation naïve de la range , un tel appel appelle récursivement onNext , conduisant à StackOverflowError ce qui est évidemment indésirable.

Pour éviter cela, les opérateurs utilisent une logique dite de trampoline qui empêche de tels appels réentrants. En termes de range , il se souviendra qu'il y avait un request(1) alors qu'il appelait onNext() et qu'une fois que onNext() reviendrait, il ferait un autre tour et appellerait onNext() avec la valeur entière suivante. Par conséquent, si les deux sont échangés, l'exemple fonctionne toujours de la même façon:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Cependant, ce n'est pas le cas pour onStart . Bien que l’infrastructure Observable garantisse qu’elle sera appelée au plus une fois sur chaque Subscriber , l’appel à la request(1) peut déclencher immédiatement l’émission d’un élément. Si l'on dispose d'une logique d'initialisation après l'appel à la request(1) nécessaire à onNext , vous pouvez vous retrouver avec des exceptions:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

Dans ce cas synchrone, une NullPointerException sera lancée immédiatement lors de l'exécution de onStart . Un bug plus subtil se produit si l'appel à la request(1) déclenche un appel asynchrone à onNext sur un autre thread et lit le name dans les courses onNext écrivant dans onStart request publication onStart .

Par conséquent, il faut faire toutes les initialisations de champs dans onStart ou même avant cela et appeler request() last. Les implémentations de request() dans les opérateurs assurent une relation correcte avant ou après (ou en d'autres termes, une libération de mémoire ou une clôture complète) si nécessaire.

Les opérateurs onBackpressureXXX

La plupart des développeurs rencontrent une contre-pression lorsque leur application échoue avec MissingBackpressureException exception MissingBackpressureException et que l'exception pointe généralement vers l'opérateur observeOn . La cause réelle est généralement l'utilisation non rétroactive de PublishSubject , timer() ou interval() ou des opérateurs personnalisés créés via create() .

Il existe plusieurs manières de faire face à de telles situations.

Augmenter la taille des tampons

Parfois, de tels débordements se produisent en raison de sources explosives. Soudain, l'utilisateur tape trop rapidement sur l'écran et observeOn le tampon interne de 16 éléments par défaut d' observeOn sur les débordements d'Android.

La plupart des opérateurs sensibles à la contre-pression dans les versions récentes de RxJava permettent désormais aux programmeurs de spécifier la taille de leurs tampons internes. Les paramètres pertinents sont généralement appelés bufferSize , prefetch ou capacityHint . Compte tenu de l'exemple débordant dans l'introduction, nous pouvons simplement augmenter la taille du tampon d' observeOn pour avoir assez de place pour toutes les valeurs.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Notez toutefois que généralement, cela peut être seulement une solution temporaire, car le dépassement de capacité peut toujours se produire si la source surproduit la taille de la mémoire tampon prévue. Dans ce cas, on peut utiliser l'un des opérateurs suivants.

Valeurs par lots / sauts avec les opérateurs standard

MissingBackpressureException les données source peuvent être traitées plus efficacement par lots, il est possible de réduire le risque d' MissingBackpressureException en utilisant l'un des opérateurs de traitement par lots standard (par taille et / ou par heure).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Si certaines des valeurs peuvent être ignorées en toute sécurité, on peut utiliser l'échantillonnage (avec le temps ou un autre Observable) et les opérateurs de limitation ( throttleFirst , throttleLast , throttleWithTimeout ).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Notez que ces opérateurs ne réduisent que le taux de réception de la valeur en aval et peuvent donc conduire à une MissingBackpressureException .

onBackpressureBuffer ()

Cet opérateur sous sa forme sans paramètre réintroduit un tampon non limité entre la source en amont et l'opérateur en aval. Être sans limite signifie que tant que la JVM ne manque pas de mémoire, elle peut gérer presque n'importe quelle quantité provenant d'une source de données en rafale.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

Dans cet exemple, la observeOn va avec une taille de tampon très faible, mais il n'y a aucune MissingBackpressureException car onBackpressureBuffer absorbe toutes les 1 million de valeurs et en observeOn petits lots à observeOn .

Notez cependant que onBackpressureBuffer consomme sa source de manière illimitée, c'est-à-dire sans y appliquer aucune contre-pression. Cela a pour conséquence que même une source de support de contre-pression telle que la range sera complètement réalisée.

Il y a 4 surcharges supplémentaires de onBackpressureBuffer

onBackpressureBuffer (capacité int)

C'est une version limitée qui signale BufferOverflowError au cas où son tampon atteindrait la capacité donnée.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

La pertinence de cet opérateur diminue à mesure que de plus en plus d’opérateurs permettent de définir leurs tailles de tampons. Pour le reste, cela donne l'occasion d'étendre leur tampon interne en ayant un nombre plus grand avec onBackpressureBuffer que leur valeur par défaut.

onBackpressureBuffer (capacité int, Action0 onOverflow)

Cette surcharge appelle une action (partagée) en cas de dépassement de capacité. Son utilité est plutôt limitée car il n'y a pas d'autres informations sur le débordement que la pile d'appels actuelle.

onBackpressureBuffer (capacité int, action0 onOverflow, stratégie BackpressureOverflow.Strategy)

Cette surcharge est en fait plus utile car elle permet de définir ce qu’il faut faire au cas où la capacité serait atteinte. Le BackpressureOverflow.Strategy est une interface, mais la classe BackpressureOverflow offre 4 champs statiques avec des implémentations représentant des actions typiques:

  • ON_OVERFLOW_ERROR : il s'agit du comportement par défaut des deux surcharges précédentes, signalant une BufferOverflowException
  • ON_OVERFLOW_DEFAULT : actuellement, il est identique à ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : si un débordement se produisait, la valeur actuelle serait simplement ignorée et seules les anciennes valeurs seraient délivrées une fois les requêtes en aval.
  • ON_OVERFLOW_DROP_OLDEST : supprime l'élément le plus ancien du tampon et lui ajoute la valeur actuelle.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Notez que les deux dernières stratégies entraînent une discontinuité dans le flux lorsqu'elles suppriment des éléments. De plus, ils ne signaleront pas BufferOverflowException .

onBackpressureDrop ()

Chaque fois que l'aval n'est pas prêt à recevoir des valeurs, cet opérateur déposera cet élément de la séquence. On peut le considérer comme une capacité onBackpressureBuffer 0 avec la stratégie ON_OVERFLOW_DROP_LATEST .

Cet opérateur est utile lorsque l'on peut ignorer en toute sécurité des valeurs provenant d'une source (telles que les déplacements de souris ou les signaux de position GPS actuels), car des valeurs plus récentes seront disponibles ultérieurement.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Cela peut être utile en conjonction avec l' interval() opérateur source interval() . Par exemple, si vous souhaitez effectuer une tâche d'arrière-plan périodique mais que chaque itération peut durer plus longtemps que la période, il est conseillé de supprimer la notification d'intervalle supplémentaire car il y en aura plus tard:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Il existe une surcharge de cet opérateur: onBackpressureDrop(Action1<? super T> onDrop) où l'action (partagée) est appelée avec la valeur en cours de suppression. Cette variante permet de nettoyer les valeurs elles-mêmes (par exemple, libérer des ressources associées).

onBackpressureLatest ()

L'opérateur final conserve uniquement la dernière valeur et remplace pratiquement les anciennes valeurs non livrées. On peut penser à ceci comme une variante du onBackpressureBuffer avec une capacité de 1 et une stratégie de ON_OVERFLOW_DROP_OLDEST .

Contrairement à onBackpressureDrop il existe toujours une valeur disponible pour la consommation si l'aval est en retard. Cela peut être utile dans certaines situations de télémétrie où les données peuvent être irrégulières, mais seule la toute dernière est intéressante pour le traitement.

Par exemple, si l'utilisateur clique beaucoup sur l'écran, nous souhaiterions toujours réagir à sa dernière entrée.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

L'utilisation de onBackpressureDrop dans ce cas conduirait à une situation où le tout dernier clic serait déposé et laisserait l'utilisateur se demander pourquoi la logique métier n'était pas exécutée.

Création de sources de données sous pression

La création de sources de données à contre-courant est la tâche relativement facile pour gérer la contre-pression en général, car la bibliothèque propose déjà des méthodes statiques sur Observable qui gèrent la contre-pression du développeur. Nous pouvons distinguer deux types de méthodes d'usine: les «générateurs» à froid qui renvoient et génèrent des éléments basés sur la demande en aval et les «pousseurs» chauds qui relient généralement les sources de données non réactives et non réversibles. leur.

juste

La source la plus consciente de base est créé contrepression via just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Puisque nous ne demandons explicitement pas dans onStart , cela n'imprimera rien. just est génial quand il y a une valeur constante , nous aimerions relancer une séquence.

Malheureusement, just est souvent confondu avec un moyen de calculer quelque chose de dynamique à être consommé par l' Subscriber s:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Étonnant à certains, cela imprime 1 fois au lieu d’imprimer 1 et 2 respectivement. Si l'appel est réécrit, il devient évident pourquoi cela fonctionne:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue est appelée dans le cadre de la routine principale et non en réponse aux abonnés abonnés.

fromCallable

Ce dont les gens ont réellement besoin, c'est de la méthode fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Ici, la valeur computeValue n'est exécutée que lorsqu'un abonné souscrit et pour chacun d'eux, en imprimant les fromCallable attendues 1 et 2. Naturellement, fromCallable également correctement en charge la contre-pression et n'émettra pas la valeur calculée sauf demande. Notez cependant que le calcul se produit quand même. Dans le cas où le calcul lui-même devrait être retardé jusqu'à ce que l'aval demande réellement, nous pouvons utiliser just map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just va just pas émettre sa valeur constante jusqu'à ce qu'il soit demandé quand il est mappé au résultat de la valeur de computeValue , toujours appelé pour chaque abonné individuellement.

de

Si les données sont déjà disponibles comme un tableau d'objets, une liste d'objets ou d' une Iterable source respectif from surcharges manipuleront la contre - pression et l' émission de ces sources:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Pour plus de commodité (et en évitant les mises en garde sur la création de tableau générique) il y a 2 à 10 argument de surcharges just que déléguer en interne à from .

Le from(Iterable) donne également une opportunité intéressante. Beaucoup de génération de valeur peuvent être exprimées sous la forme d'une machine à états. Chaque élément demandé déclenche une transition d'état et un calcul de la valeur renvoyée.

L' écriture de ces machines d'état comme Iterable s est un peu compliqué (mais encore plus facile que d' écrire un Observable pour consommer) et contrairement à C #, Java ne pas le soutien du compilateur pour construire ces machines d'état en écrivant le code à la recherche classique (avec un yield return et yield break ). Certaines bibliothèques offrent une aide, telle que AbstractIterable Google Guava et Ix.generate() et Ix.forloop() . Celles-ci sont dignes d'une série complète, voyons donc une source Iterable très basique qui répète indéfiniment une valeur constante:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Si nous consommions l' iterator via for-loop classique, cela entraînerait une boucle infinie. Puisque nous en construisons une Observable , nous pouvons exprimer notre volonté de ne consommer que les cinq premiers, puis cesser de demander quoi que ce soit. C'est le véritable pouvoir d'évaluation et de calcul paresseux à l'intérieur de Observable .

créer (SyncOnSubscribe)

Parfois, la source de données à convertir dans le monde réactif lui-même est synchrone (bloquant) et ressemble à un pull, c'est-à-dire que nous devons appeler une méthode get ou read pour obtenir le prochain élément de données. On pourrait, bien sûr, transformer cela en une Iterable mais lorsque de telles sources sont associées à des ressources, nous pouvons perdre ces ressources si l’aval se désabonne de la séquence avant sa fin.

Pour gérer de tels cas, RxJava a la classe SyncOnSubscribe . On peut l'étendre et implémenter ses méthodes ou utiliser l'une de ses méthodes d'usine basées sur lambda pour créer une instance.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

SyncOnSubscribe utilise généralement 3 rappels.

Les premiers rappels permettent de créer un état par abonné, tel que FileInputStream dans l'exemple; le fichier sera ouvert indépendamment à chaque abonné individuel.

Le second rappel prend cet objet d'état et fournit une sortie Observer dont les méthodes onXXX peuvent être appelées pour émettre des valeurs. Ce rappel est exécuté autant de fois que l’aval demandé. A chaque invocation, il doit appeler onNext au plus une fois éventuellement suivi par onError ou onCompleted . Dans l'exemple, nous appelons onCompleted() si l'octet de lecture est négatif, en indiquant et en fin de fichier, et appelons onError si la lecture renvoie une IOException .

Le rappel final est appelé lorsque la désabonnement en aval (fermeture du flux d'entrée) ou lorsque le rappel précédent a appelé les méthodes du terminal; cela permet de libérer des ressources. Toutes les sources n'ayant pas besoin de toutes ces fonctionnalités, les méthodes statiques de SyncOnSubscribe permettent de créer des instances sans elles.

Malheureusement, de nombreux appels de méthodes à travers la JVM et d'autres bibliothèques lancent des exceptions vérifiées et doivent être intégrés dans des try-catch car les interfaces fonctionnelles utilisées par cette classe ne permettent pas de lancer des exceptions vérifiées.

Bien sûr, nous pouvons imiter d’autres sources typiques, telles qu’une gamme sans limites:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

Dans cette configuration, le current commence avec 0 et la prochaine fois que le lambda est invoqué, le paramètre current est maintenant 1 .

Il existe une variante de SyncOnSubscribe appelée AsyncOnSubscribe qui ressemble beaucoup à l'exception que le callback du milieu prend également une valeur longue qui représente le montant de la requête en aval et que le rappel doit générer une Observable de même longueur. Cette source concatène ensuite tous ces Observable en une seule séquence.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Il y a une discussion en cours (passionnée) sur l'utilité de cette classe et généralement non recommandée car elle rompt régulièrement les attentes sur la façon dont elle va réellement émettre ces valeurs générées et comment elle va répondre, ou même sur le type de demande qu'elle recevra. scénarios de consommation plus complexes.

créer (émetteur)

Parfois, la source à encapsuler dans un objet Observable est déjà chaude (comme les déplacements de souris) ou froide mais ne peut pas être rétrogradée dans son API (comme un rappel réseau asynchrone).

Pour gérer de tels cas, une version récente de RxJava a introduit la méthode de create(emitter) . Il faut deux paramètres:

  • un rappel qui sera appelé avec une instance de l'interface Emitter<T> pour chaque abonné entrant,
  • une énumération Emitter.BackpressureMode qui oblige le développeur à spécifier le comportement de contre-pression à appliquer. Il possède les modes habituels, similaires à onBackpressureXXX en plus de signaler une MissingBackpressureException ou d'ignorer simplement un tel débordement à l'intérieur.

Notez qu'il ne prend actuellement pas en charge les paramètres supplémentaires pour ces modes de contre-pression. Si vous avez besoin de cette personnalisation, vous devez utiliser NONE comme mode de contre-pression et appliquer le onBackpressureXXX approprié à l' Observable résultant.

Le premier cas typique d'utilisation de celui-ci lorsque l'on souhaite interagir avec une source basée sur les commandes, comme les événements d'interface graphique. Ces API comportent une forme d' addListener / removeListener que l'on peut utiliser:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

L' Emitter est relativement simple à utiliser; on peut appeler onNext , onError et onCompleted dessus et l'opérateur gère lui-même la gestion de la contre-pression et de la désinscription. En outre, si l’API encapsulée prend en charge l’annulation (telle que la suppression de l’écouteur dans l’exemple), on peut utiliser setCancellation (ou setSubscription pour Subscription like Ressource) pour enregistrer un rappel d’annulation qui est onError ou onError / onCompleted est appelé sur l'instance Emitter fournie.

Ces méthodes permettent à une seule ressource d'être associée à l'émetteur à la fois et de définir une nouvelle pour désabonner l'ancienne. Si vous devez gérer plusieurs ressources, créez un CompositeSubscription , associez-le à l'émetteur, puis ajoutez des ressources supplémentaires à CompositeSubscription :

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Le second scénario implique généralement une API asynchrone basée sur le rappel qui doit être convertie en une Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

Dans ce cas, la délégation fonctionne de la même manière. Malheureusement, ces API classiques de style callback ne prennent généralement pas en charge l'annulation, mais si tel est le cas, vous pouvez configurer leur annulation comme dans les exemples précédents (avec peut-être un moyen plus complexe). Notez l'utilisation du LATEST mode contre - pression; Si nous savons qu’il n’y aura qu’une seule valeur, nous n’avons pas besoin de la stratégie BUFFER car elle alloue un tampon long de 128 éléments par défaut (qui croît au besoin) qui ne sera jamais pleinement utilisé.



Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow