Sök…


Introduktion

Detta ämne visar exempel och dokumentation med avseende på de reaktiva koncepten för Flowable och Subscribe som introducerades i rxjava version2

Anmärkningar

exemplet behöver rxjava2 som ett beroende, de maven koordinaterna för den använda versionen är:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

producentkonsumentexempel med stöd för mottryck hos producenten

TestProducer från detta exempel producerar Integer inom ett visst intervall och skjuter dem till dess Subscriber . Det utökar Flowable<Integer> -klassen. För en ny prenumerant skapar det ett Subscription vars request(long) metod används för att skapa och publicera heltal.

Det är viktigt för Subscription som skickas till subscriber att request() -metoden som ringer onNext() på abonnenten kan rekursivt anropas från detta onNext() . För att förhindra ett outStandingRequests använder den visade implementeringen outStandingRequests räknaren och isProducing flaggan.

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

Konsumenten i detta exempel utökar DefaultSubscriber<Integer> och vid start och efter att ha konsumerat ett heltal begär det nästa. När du konsumerar heltalvärdena finns det en liten fördröjning, så mottrycket kommer att byggas upp för tillverkaren.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

i följande huvudmetod för en testklass skapas och kopplas upp producenten och konsumenten:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

När exemplet körs visar loggfilen att konsumenten körs kontinuerligt medan producenten bara blir aktiv när den interna flödbara bufferten från rxjava2 måste fyllas på.



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow