サーチ…


前書き

このトピックでは、rxjava version2で導入されたフローティングとサブスクライバの反応概念に関する例とドキュメントを示します。

備考

この例では依存関係としてrxjava2が必要です。使用されるバージョンのmaven座標は次のとおりです。

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

プロデューサーのバックプレッシャーをサポートするプロデューサーの消費者の例

この例のTestProducerは、指定された範囲内のIntegerオブジェクトを生成し、 Subscriberプッシュします。 Flowable<Integer>クラスを拡張します。新しいサブスクライバの場合、 request(long)メソッドを使用してInteger値を作成および公開するSubscriptionオブジェクトを作成します。

subscriber渡されるSubscriptionでは、 subscriber onNext()メソッドを呼び出すrequest()メソッドをこのonNext()コールから再帰的に呼び出すことがonNext()です。スタックのオーバーフローを防ぐために、実装ではoutStandingRequestsカウンタとisProducingフラグを使用します。

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

この例のConsumerはDefaultSubscriber<Integer>を拡張DefaultSubscriber<Integer> 、開始時と終了時に整数を消費すると次の値を要求します。 Integer値を消費すると少し遅れるので、背圧はプロデューサのために構築されます。

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

テストクラスの次のメインメソッドでは、プロデューサとコンシューマが作成され、配線されます。

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

この例を実行すると、ログファイルにはコンシューマが継続的に実行され、rxjava2の内部Flowableバッファをリフィルする必要がある場合にのみプロデューサがアクティブになります。



Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow