खोज…


परिचय

यह विषय Rxjava संस्करण 2 में पेश किए गए फ़्लोएबल और सब्सक्राइबर की प्रतिक्रियात्मक अवधारणाओं के संबंध में उदाहरण और प्रलेखन दिखाता है

टिप्पणियों

उदाहरण के लिए एक निर्भरता के रूप में rxjava2 की जरूरत है, उपयोग किए गए संस्करण के लिए मावेन निर्देशांक हैं:

    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.8</version>
    </dependency>

निर्माता उपभोक्ता उदाहरण के साथ निर्माता में backpressure समर्थन करते हैं

इस उदाहरण से TestProducer किसी दिए गए श्रेणी में Integer ऑब्जेक्ट्स का उत्पादन करता है और उन्हें अपने Subscriber लिए धकेलता है। यह Flowable<Integer> क्लास का विस्तार करता है। एक नए सब्सक्राइबर के लिए, यह एक Subscription ऑब्जेक्ट बनाता है जिसका request(long) विधि का उपयोग पूर्णांक मान बनाने और प्रकाशित करने के लिए किया जाता है।

subscriber लिए पास की गई Subscription लिए यह महत्वपूर्ण है कि subscriber request() onNext() को कॉल करने वाले request() विधि को इस onNext() कॉल के भीतर से पुन: कॉल किया जा सके। स्टैक ओवरफ़्लो को रोकने के लिए, दिखाया गया कार्यान्वयन outStandingRequests काउंटर और isProducing ध्वज का उपयोग करता है।

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

इस उदाहरण में उपभोक्ता DefaultSubscriber<Integer> विस्तार करता है और शुरू होने के बाद और बाद में एक Integer अनुरोध करता है। इंटेगर मूल्यों का उपभोग करने पर, थोड़ा विलंब होता है, इसलिए निर्माता के लिए बैकस्पेस का निर्माण किया जाएगा।

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

एक परीक्षण वर्ग के मुख्य विधि में निर्माता और उपभोक्ता को बनाया और वायर्ड किया जाता है:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

उदाहरण चलाते समय, लॉगफ़ाइल दिखाता है कि उपभोक्ता लगातार चलता है, जबकि निर्माता केवल तभी सक्रिय होता है जब rxjava2 के आंतरिक फ़्लोएबल बफर को फिर से भरने की आवश्यकता होती है।



Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow