Szukaj…


Wprowadzenie

W tym temacie pokazano przykłady i dokumentację dotyczącą reaktywnych koncepcji Flowable i Subscriber, które zostały wprowadzone w rxjava version2

Uwagi

przykład wymaga rxjava2 jako zależności, współrzędne maven dla używanej wersji to:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

przykład konsumenta producenta ze wsparciem przeciwciśnienia u producenta

TestProducer z tego przykładu tworzy obiekty Integer w danym zakresie i przekazuje je do swojego Subscriber . Rozszerza klasę Flowable<Integer> . Dla nowego subskrybenta tworzy obiekt Subscription którego metoda request(long) jest używana do tworzenia i publikowania wartości całkowitych.

Jest to ważne dla Subscription , który jest przekazywany do subscriber , że request() metoda, która zwraca onNext() na abonenta może być wywołana z rekurencyjnie w tym onNext() rozmowy. Aby zapobiec przepełnieniu stosu, pokazana implementacja używa licznika outStandingRequests i flagi isProducing .

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

Konsument w tym przykładzie rozszerza DefaultSubscriber<Integer> a na początku i po zużyciu liczby całkowitej żąda następnego. Po zużyciu wartości całkowitych występuje niewielkie opóźnienie, więc producent wytworzy przeciwciśnienie.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

w następującej głównej metodzie klasy testowej producent i konsument są tworzeni i podłączani:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

Podczas uruchamiania przykładu plik dziennika pokazuje, że konsument działa nieprzerwanie, podczas gdy producent staje się aktywny tylko wtedy, gdy wewnętrzny bufor przepływności rxjava2 wymaga ponownego napełnienia.



Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow