rx-java
RxJava2 फ़्लोएबल और सब्सक्राइबर
खोज…
परिचय
यह विषय Rxjava संस्करण 2 में पेश किए गए फ़्लोएबल और सब्सक्राइबर की प्रतिक्रियात्मक अवधारणाओं के संबंध में उदाहरण और प्रलेखन दिखाता है
टिप्पणियों
उदाहरण के लिए एक निर्भरता के रूप में rxjava2 की जरूरत है, उपयोग किए गए संस्करण के लिए मावेन निर्देशांक हैं:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
निर्माता उपभोक्ता उदाहरण के साथ निर्माता में backpressure समर्थन करते हैं
इस उदाहरण से TestProducer किसी दिए गए श्रेणी में Integer ऑब्जेक्ट्स का उत्पादन करता है और उन्हें अपने Subscriber लिए धकेलता है। यह Flowable<Integer> क्लास का विस्तार करता है। एक नए सब्सक्राइबर के लिए, यह एक Subscription ऑब्जेक्ट बनाता है जिसका request(long) विधि का उपयोग पूर्णांक मान बनाने और प्रकाशित करने के लिए किया जाता है।
subscriber लिए पास की गई Subscription लिए यह महत्वपूर्ण है कि subscriber request() onNext() को कॉल करने वाले request() विधि को इस onNext() कॉल के भीतर से पुन: कॉल किया जा सके। स्टैक ओवरफ़्लो को रोकने के लिए, दिखाया गया कार्यान्वयन outStandingRequests काउंटर और isProducing ध्वज का उपयोग करता है।
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
इस उदाहरण में उपभोक्ता DefaultSubscriber<Integer> विस्तार करता है और शुरू होने के बाद और बाद में एक Integer अनुरोध करता है। इंटेगर मूल्यों का उपभोग करने पर, थोड़ा विलंब होता है, इसलिए निर्माता के लिए बैकस्पेस का निर्माण किया जाएगा।
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
एक परीक्षण वर्ग के मुख्य विधि में निर्माता और उपभोक्ता को बनाया और वायर्ड किया जाता है:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
उदाहरण चलाते समय, लॉगफ़ाइल दिखाता है कि उपभोक्ता लगातार चलता है, जबकि निर्माता केवल तभी सक्रिय होता है जब rxjava2 के आंतरिक फ़्लोएबल बफर को फिर से भरने की आवश्यकता होती है।