수색…


소개

이 항목에서는 rxjava version2에서 소개 된 Flowable 및 Subscriber의 사후 대응 개념과 관련된 예제 및 문서를 보여줍니다.

비고

이 예제에서는 rxjava2가 종속성으로 필요하며 사용 된 버전의 관리 좌표는 다음과 같습니다.

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

생산자에서 배압을 지원하는 생산자 소비자 사례

이 예제의 TestProducer 는 주어진 범위에서 Integer 개체를 생성하고 Subscriber 개체로 푸시합니다. Flowable<Integer> 클래스를 확장합니다. 새 구독자의 request(long) 메서드를 사용하여 Integer 값을 만들고 게시하는 Subscription 개체를 만듭니다.

그것은 중요하다 Subscription 에 전달되는 subscriber request() 메소드 호출 onNext() 가입자에 재귀이 내에서 호출 할 수 있습니다 onNext() 호출. 스택 오버 플로우를 방지하기 위해, 도시 된 구현은 사용 outStandingRequests 카운터와 isProducing 플래그.

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

이 예제에서 Consumer는 DefaultSubscriber<Integer> 를 확장하고 DefaultSubscriber<Integer> 를 사용하면 시작 및 다음에 다음 요청을 요청합니다. Integer 값을 소비 할 때 약간의 지연이 있기 때문에 배압은 생산자를 위해 구성됩니다.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

테스트 클래스의 다음 주요 메소드에서 생산자와 소비자가 생성되고 유선화됩니다.

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

예제를 실행할 때 로그 파일은 소비자가 계속 실행되는 것을 보여 주지만 rxjava2의 내부 Flowable 버퍼가 다시 채워질 필요가있을 때만 생성자가 활성화됩니다.



Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow