Suche…


Einführung

Dieses Thema zeigt Beispiele und Dokumentation zu den reaktiven Konzepten von Flowable und Subscriber, die in rxjava Version2 eingeführt wurden

Bemerkungen

Das Beispiel benötigt rxjava2 als Abhängigkeit, die Maven-Koordinaten für die verwendete Version sind:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

Beispiel eines Produzenten-Verbrauchers mit Unterstützung des Gegendrucks beim Hersteller

Der TestProducer aus diesem Beispiel erzeugt Integer Objekte in einem bestimmten Bereich und überträgt sie an seinen Subscriber . Es erweitert die Klasse Flowable<Integer> . Bei einem neuen Abonnenten wird ein Subscription , dessen Methode request(long) zum Erstellen und Veröffentlichen der Integer-Werte verwendet wird.

Für das Subscription , das an den subscriber , ist es wichtig, dass die request() Methode, die onNext() des Abonnenten onNext() aus diesem onNext() Aufruf rekursiv aufgerufen werden kann. Um einen Stapelüberlauf zu verhindern, verwendet die gezeigte Implementierung den Zähler outStandingRequests und das Flag isProducing .

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

Der Consumer in diesem Beispiel erweitert DefaultSubscriber<Integer> und fordert beim Start und nach Verbrauch einer Integer-Anforderung die nächste an. Beim Verbrauch der Integer-Werte tritt eine kleine Verzögerung auf, sodass der Gegendruck für den Hersteller aufgebaut wird.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

In der folgenden Hauptmethode einer Testklasse werden Hersteller und Verbraucher erstellt und verkabelt:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

Beim Ausführen des Beispiels zeigt die Protokolldatei, dass der Consumer kontinuierlich ausgeführt wird, während der Produzent nur dann aktiv wird, wenn der interne fließfähige Puffer von rxjava2 nachgefüllt werden muss.



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow