Suche…


Erstellen Sie ein Observable

Es gibt mehrere Möglichkeiten, ein Observable in RxJava zu erstellen. Die leistungsfähigste Methode ist die Verwendung der Observable.create Methode. Es ist aber auch der komplizierteste Weg . Sie müssen es daher so weit wie möglich vermeiden .

Einen hervorragenden Wert ausgeben

Wenn Sie bereits einen Wert haben, können Sie Observable.just , um Ihren Wert Observable.just .

 Observable.just("Hello World").subscribe(System.out::println);

Einen Wert ausgeben, der berechnet werden soll

Wenn Sie einen Wert ausgeben möchten, der noch nicht berechnet wurde oder dessen Berechnung lange dauern kann, können Sie mit Observable.fromCallable den nächsten Wert ausgeben.

Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);

longComputation() wird nur aufgerufen, wenn Sie Ihr Observable abonnieren. Auf diese Weise wird die Berechnung faul .

Alternative zum Senden eines Wertes, der berechnet werden soll

Observable.defer ein Observable wie Observable.fromCallable , es wird jedoch verwendet, wenn Sie statt eines Werts ein Observable zurückgeben müssen. Dies ist nützlich, wenn Sie die Fehler in Ihrem Anruf verwalten möchten.

Observable.defer(() -> {
          try {
                return Observable.just(longComputation());
          } catch(SpecificException e) {
                return Observable.error(e);
          }).subscribe(System.out::println);

Warme und kalte Observables

Observables werden in Abhängigkeit von ihrem Emissionsverhalten allgemein als Hot oder Cold klassifiziert.
Ein Cold Observable ist dasjenige, das auf Anfrage zu Cold Observable beginnt (Abonnement), während ein Hot Observable unabhängig von Abonnements aussendet.

Kalt beobachtbar

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

Die Ausgabe des obigen Codes sieht folgendermaßen aus (kann variieren):

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

Beachten Sie, dass obwohl sub2 spät startet, die Werte von Anfang an empfangen werden. Cold Observable gibt ein Cold Observable nur Elemente aus, wenn dies verlangt wird. Mehrfachanforderung startet mehrere Pipelines.

Heiß beobachtbar

Hinweis: Heiße Observables geben Werte unabhängig von den einzelnen Abonnements aus. Sie haben ihre eigene Zeitleiste und Ereignisse treten auf, egal ob jemand zuhört oder nicht.

Ein Cold Observale kann mit einem einfachen publish in ein Hot Observable umgewandelt werden.

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish gibt ein ConnectableObservable , das Funktionen zum Verbinden und Trennen der Verbindung mit dem Observable hinzufügt.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

Die oben genannten Erträge:

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

sub2 beachte, dass, obwohl sub2 spät zu beobachten beginnt, es mit 1 sub1 .
Die Trennung ist etwas komplizierter! Das Trennen der Verbindung erfolgt beim Subscription und nicht beim Observable .

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

Das oben genannte produziert:

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

Beim Trennen der Verbindung wird Observable Wesentlichen beendet und neu gestartet, wenn ein neues Abonnement hinzugefügt wird.

Hot Observable kann zum Erstellen eines EventBus . Solche EventBusse sind im Allgemeinen leicht und superschnell. Der einzige Nachteil eines RxBus besteht darin, dass alle Ereignisse manuell implementiert und an den Bus übergeben werden müssen.



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow