Buscar..


Introducción

Este tema muestra ejemplos y documentación con respecto a los conceptos reactivos de Flowable y Subscriber que se introdujeron en rxjava versión 2.

Observaciones

el ejemplo necesita rxjava2 como una dependencia, las coordenadas de Maven para la versión utilizada son:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

Ejemplo de consumidor productor con soporte de contrapresión en el productor.

El TestProducer de este ejemplo produce objetos Integer en un rango dado y los empuja a su Subscriber . Extiende la clase Flowable<Integer> . Para un nuevo suscriptor, crea un objeto de Subscription cuyo método de request(long) se utiliza para crear y publicar los valores de enteros.

Es importante para la Subscription que se transmite al subscriber que el método request() que llama onNext() en el suscriptor se puede llamar recursivamente desde esta llamada onNext() . Para evitar un desbordamiento de pila, la implementación que se muestra utiliza el contador outStandingRequests y el indicador isProducing .

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

El Consumidor en este ejemplo extiende DefaultSubscriber<Integer> y en el inicio y después de consumir un Integer solicita el siguiente. Al consumir los valores de Integer, hay un pequeño retraso, por lo que se creará la contrapresión para el productor.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

En el siguiente método principal de una clase de prueba, el productor y el consumidor se crean y conectan:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

Cuando se ejecuta el ejemplo, el archivo de registro muestra que el consumidor se ejecuta continuamente, mientras que el productor solo se activa cuando se debe rellenar el búfer interno Flowable de rxjava2.



Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow