rx-java
ऊपर का दवाब
खोज…
परिचय
Backpressure तब होता है जब एक Observable प्रोसेसिंग पाइप लाइन में, कुछ अतुल्यकालिक चरण तेजी से मूल्यों को संसाधित नहीं कर सकते हैं और अपस्ट्रीम निर्माता को धीमा करने का तरीका बताने की आवश्यकता होती है।
बैकप्रेशर की आवश्यकता का क्लासिक मामला तब है जब निर्माता एक गर्म स्रोत है:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
इस उदाहरण में, मुख्य धागा एक अंत उपभोक्ता को 1 मिलियन आइटम का उत्पादन करेगा जो इसे एक पृष्ठभूमि थ्रेड पर संसाधित कर रहा है। यह संभावना है कि compute(int) विधि में कुछ समय लगता है लेकिन Observable ऑपरेटर श्रृंखला के ओवरहेड को आइटम को संसाधित करने में लगने वाले समय में भी जोड़ा जा सकता है। हालाँकि, लूप के लिए उत्पादक थ्रेड यह नहीं जान सकता है और onNext रखता है।
आंतरिक रूप से, अतुल्यकालिक ऑपरेटरों के पास ऐसे तत्व रखने के लिए बफर होते हैं जब तक कि उन्हें संसाधित नहीं किया जा सकता है। शास्त्रीय Rx.NET और शुरुआती RxJava में, इन बफ़र्स को अनबाउंड किया गया था, जिसका अर्थ है कि वे उदाहरण से लगभग सभी 1 मिलियन तत्वों को धारण करेंगे। समस्या तब शुरू होती है जब उदाहरण के लिए, 1 बिलियन तत्व या एक ही 1 मिलियन अनुक्रम एक प्रोग्राम में 1000 बार दिखाई देता है, जिसके कारण OutOfMemoryError और जीसी ओवरहेड अत्यधिक होने के कारण आम तौर पर मंदी होती है।
इसी तरह से त्रुटि-हैंडलिंग एक प्रथम श्रेणी का नागरिक बन गया और इससे निपटने के लिए ऑपरेटरों को प्राप्त किया गया ( onErrorXXX ऑपरेटरों के माध्यम से), onErrorXXX की एक और संपत्ति है, जिसके बारे में प्रोग्रामर को सोचना पड़ता है और हैंडल ( onBackpressureXXX ऑपरेटरों) के माध्यम से करना onBackpressureXXX ।
PublishSubject से परे, ऐसे अन्य ऑपरेटर हैं जो PublishSubject समर्थन नहीं करते हैं, ज्यादातर कार्यात्मक कारणों के कारण। उदाहरण के लिए, ऑपरेटर interval समय-समय पर मूल्यों का उत्सर्जन करता है, बैकप्रेशरिंग यह एक दीवार घड़ी के सापेक्ष अवधि में शिफ्टिंग का कारण होगा।
आधुनिक RxJava में, अधिकांश अतुल्यकालिक ऑपरेटरों में अब एक observeOn आंतरिक बफर होता है, जैसा observeOn ऊपर से observeOn गया है और इस बफ़र को ओवरफ़्लो करने का कोई भी प्रयास संपूर्ण अनुक्रम को MissingBackpressureException साथ समाप्त कर MissingBackpressureException । प्रत्येक ऑपरेटर के दस्तावेज़ीकरण में उसके बैकपेचर व्यवहार के बारे में विवरण होता है।
हालाँकि, नियमित ठंडे क्रमों में MissingBackpressureException अधिक सूक्ष्म रूप से मौजूद होता है (जो MissingBackpressureException नहीं करता है और नहीं होना चाहिए)। यदि पहला उदाहरण फिर से लिखा गया है:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
कोई त्रुटि नहीं है और छोटे मेमोरी उपयोग के साथ सब कुछ आसानी से चलता है। इसका कारण यह है कि कई स्रोत ऑपरेटर मांग पर मूल्यों को "उत्पन्न" कर सकते हैं और इस प्रकार ऑपरेटर observeOn , range जेनरेट कर सकता है, observeOn यह बताया गया है कि observeOn बफर बिना एक ही समय में कितने मानों को धारण कर सकता है।
यह बातचीत सह-दिनचर्या के कंप्यूटर विज्ञान की अवधारणा पर आधारित है (मैं आपको फोन करता हूं, आप मुझे फोन करते हैं)। ऑपरेटर range , Producer इंटरफ़ेस के कार्यान्वयन के रूप में, कॉलबैक को उसके (इनर Subscriber के) setProducer को कॉल करके observeOn करने के लिए observeOn है। बदले में, observeOn Producer.request(n) को एक वैल्यू के साथ बताता है कि जिस range को प्रोड्यूस करने की इजाजत दी गई है (यानी, onNext ही) कई अतिरिक्त एलिमेंट्स। यह तो सही समय में request विधि को कॉल करने के लिए और डेटा को बहने नहीं रखने के लिए सही मूल्य के साथ observeOn करने की जिम्मेदारी है।
अंत-उपभोक्ताओं में बैकप्रेस को व्यक्त करना शायद ही कभी आवश्यक होता है (क्योंकि वे अपने तत्काल अपस्ट्रीम के संबंध में सिंक्रोनस होते हैं और बैकस्पेसर स्वाभाविक रूप से कॉल-स्टैक ब्लॉकिंग के कारण होता है), लेकिन इसके कामकाज को समझना आसान हो सकता है:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
यहां onStart कार्यान्वयन अपने पहले मूल्य का उत्पादन करने के लिए range को इंगित करता है, जो बाद में onNext में प्राप्त onNext । एक बार जब compute(int) खत्म हो जाता है, तो range से दूसरे मूल्य का अनुरोध किया जाता range । की एक भोली कार्यान्वयन में range , इस तरह के कॉल रिकर्सिवली कहेंगे onNext , के लिए अग्रणी StackOverflowError पाठ्यक्रम अवांछनीय की है।
इसे रोकने के लिए, ऑपरेटर तथाकथित ट्रैंपोलिनिंग लॉजिक का उपयोग करते हैं जो इस तरह के रीएन्ट्रेंट कॉल को रोकता है। range के संदर्भ में, यह याद रखेगा कि एक request(1) कॉल था, जबकि इसे onNext() और एक बार onNext() रिटर्न कहते हैं, यह अगले पूर्णांक मान के साथ एक और गोल और onNext() कॉल onNext() । इसलिए, यदि दोनों की अदला-बदली होती है, तो उदाहरण अभी भी काम करता है:
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
हालाँकि, यह onStart लिए सही नहीं है। यद्यपि Observable अवसंरचना की गारंटी है कि इसे प्रत्येक Subscriber पर सबसे अधिक बार कॉल किया जाएगा, request(1) करने के लिए कॉल request(1) तुरंत एक तत्व के उत्सर्जन को ट्रिगर कर सकता है। यदि किसी के पास कॉल करने के बाद आरंभीकरण तर्क है request(1) जो कि onNext द्वारा आवश्यक है, तो आप अपवादों के साथ समाप्त हो सकते हैं:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
इस तुल्यकालिक मामले में, NullPointerException को अभी भी onStart निष्पादित करते समय तुरंत फेंक दिया जाएगा। एक अधिक सूक्ष्म बग तब होता है जब request(1) करने के लिए कॉल request(1) किसी अन्य धागे पर onNext पर एक अतुल्यकालिक कॉल को ट्रिगर करता है और onNext दौड़ में पढ़ने का name इसे onStart पोस्ट request में onStart ।
इसलिए, किसी को onStart या उससे पहले के सभी फील्ड इनिशियलाइज़ेशन को करना चाहिए और कॉल request() अंतिम रूप देना चाहिए। ऑपरेटरों में request() कार्यान्वयन उचित होने से पहले संबंध सुनिश्चित करते हैं (या अन्य शब्दों में, मेमोरी रिलीज़ या पूर्ण बाड़) जब आवश्यक हो।
OnBackpressureXXX ऑपरेटरों
अधिकांश डेवलपर्स का सामना तब होता है जब उनका आवेदन MissingBackpressureException साथ विफल हो MissingBackpressureException और अपवाद आमतौर पर observeOn ऑपरेटर को observeOn करता है। वास्तविक कारण आमतौर पर PublishSubject , timer() या interval() या कस्टम ऑपरेटर्स द्वारा create() के गैर- PublishSubject उपयोग होता है।
ऐसी स्थितियों से निपटने के कई तरीके हैं।
बफर आकार में वृद्धि
कभी-कभी इस तरह के ओवरफ्लो, धमाकेदार स्रोतों के कारण होते हैं। अचानक, उपयोगकर्ता स्क्रीन को बहुत तेज़ी से टैप करता है और Android के ओवरफ्लो पर 16-तत्व के आंतरिक बफर का observeOn करता है।
RxJava के हाल के संस्करणों में अधिकांश बैकप्रेशर-संवेदनशील ऑपरेटर अब प्रोग्रामर को अपने आंतरिक बफ़र्स के आकार को निर्दिष्ट करने की अनुमति देते हैं। प्रासंगिक पैरामीटर आमतौर पर कहा जाता है bufferSize , prefetch या capacityHint । observeOn में अतिप्रवाहित उदाहरण को देखते हुए, हम सभी मानों के लिए पर्याप्त जगह होने के लिए बस observeOn का आकार बढ़ा सकते हैं।
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
.subscribe(e -> { }, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
ध्यान दें कि आम तौर पर, यह केवल एक अस्थायी सुधार हो सकता है क्योंकि ओवरफ्लो अभी भी हो सकता है यदि स्रोत अनुमानित बफर आकार को ओवरप्रोड्यूस करता है। इस मामले में, एक निम्नलिखित ऑपरेटरों में से किसी एक का उपयोग कर सकता है।
मानक ऑपरेटरों के साथ बैचिंग / लंघन मान
यदि स्रोत डेटा को बैच में अधिक कुशलतापूर्वक संसाधित किया जा सकता है, तो कोई मानक बैचिंग ऑपरेटरों (आकार और / या समय के अनुसार) का उपयोग करके MissingBackpressureException की संभावना को कम कर सकता है।
PublishSubject<Integer> source = PublishSubject.create();
source
.buffer(1024)
.observeOn(Schedulers.computation(), 1024)
.subscribe(list -> {
list.parallelStream().map(e -> e * e).first();
}, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
यदि कुछ मानों को सुरक्षित रूप से अनदेखा किया जा सकता है, तो नमूना (समय या किसी अन्य ऑब्जर्वेबल के साथ) और थ्रॉटलिंग ऑपरेटर ( throttleFirst , throttleLast , throttleWithTimeout ) का उपयोग कर सकते हैं।
PublishSubject<Integer> source = PublishSubject.create();
source
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation(), 1024)
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
ध्यान दें कि ये ऑपरेटर केवल डाउनस्ट्रीम द्वारा मूल्य रिसेप्शन की दर को कम करते हैं और इस प्रकार वे अभी भी MissingBackpressureException हो सकते हैं।
onBackpressureBuffer ()
यह पैरामीटर अपने पैरामीटर रहित रूप में अपस्ट्रीम स्रोत और डाउनस्ट्रीम ऑपरेटर के बीच एक अनबाउंड बफर को फिर से प्रस्तुत करता है। जब तक जेवीएम मेमोरी से बाहर नहीं निकलता है, तब तक अनबाउंड होने का मतलब है कि यह एक फालतू स्रोत से आने वाली लगभग किसी भी राशि को संभाल सकता है।
Observable.range(1, 1_000_000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), 8)
.subscribe(e -> { }, Throwable::printStackTrace);
इस उदाहरण में, observeOn एक बहुत ही कम बफर आकार के साथ जाता है फिर भी वहाँ कोई MissingBackpressureException नहीं है क्योंकि onBackpressureBuffer सभी 1 मिलियन मानों को observeOn और इसे observeOn करने के लिए छोटे बैचों को observeOn ।
हालांकि ध्यान दें कि onBackpressureBuffer अपने स्रोत को बिना किसी तरीके के उपयोग करता है, अर्थात यह किसी भी बैकपेचर को लागू किए बिना। इसका परिणाम यह होता है कि range जैसे बैकस्पेस-सपोर्टिंग सोर्स भी पूरी तरह से साकार हो जाएंगे।
onBackpressureBuffer 4 अतिरिक्त अधिभार हैं
onBackpressureBuffer (int क्षमता)
यह एक बाउंडेड वर्जन है जो बफर को दी गई क्षमता तक पहुंचने की स्थिति में BufferOverflowError का संकेत देता है।
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
इस ऑपरेटर की प्रासंगिकता कम होती जा रही है क्योंकि अधिक से अधिक ऑपरेटर अब अपने बफर आकार को सेट करने की अनुमति देते हैं। बाकी के लिए, यह उनके डिफ़ॉल्ट की तुलना में onBackpressureBuffer साथ एक बड़ी संख्या होने से "उनके आंतरिक बफर का विस्तार" करने का अवसर देता है।
onBackpressureBuffer (int क्षमता, Action0 onOverflow)
अतिप्रवाह होने की स्थिति में यह अधिभार एक (साझा) क्रिया कहता है। इसकी उपयोगिता सीमित है क्योंकि वर्तमान कॉल स्टैक की तुलना में अतिप्रवाह के बारे में कोई अन्य जानकारी प्रदान नहीं की गई है।
onBackpressureBuffer (int क्षमता, Action0 onOverflow, BackpressureOverflow.Strategy रणनीति)
यह अधिभार वास्तव में अधिक उपयोगी है क्योंकि यह इस बात को परिभाषित करता है कि क्षमता तक पहुँचने के मामले में क्या करना है। BackpressureOverflow.Strategy वास्तव में एक इंटरफ़ेस है लेकिन वर्ग BackpressureOverflow 4 स्टैटिक फ़ील्ड प्रदान करता है जिसके क्रियान्वयन के साथ यह विशिष्ट क्रियाओं का प्रतिनिधित्व करता है:
-
ON_OVERFLOW_ERROR: यह पिछले दो भार के के डिफ़ॉल्ट व्यवहार, एक संकेतBufferOverflowException -
ON_OVERFLOW_DEFAULT: वर्तमान में यहON_OVERFLOW_ERRORके समान है -
ON_OVERFLOW_DROP_LATEST: यदि अतिप्रवाह होगा, तो वर्तमान मूल्य को केवल अनदेखा किया जाएगा और डाउनस्ट्रीम अनुरोधों के बाद केवल पुराने मान वितरित किए जाएंगे। -
ON_OVERFLOW_DROP_OLDEST: बफर में सबसे पुराने तत्व कोON_OVERFLOW_DROP_OLDESTऔर इसमें वर्तमान मूल्य जोड़ता है।
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
ध्यान दें कि पिछली दो रणनीतियाँ धारा में असंतोष पैदा करती हैं क्योंकि वे तत्वों को छोड़ देते हैं। इसके अलावा, वे BufferOverflowException सिग्नल नहीं करेंगे।
onBackpressureDrop ()
जब भी डाउनस्ट्रीम मूल्यों को प्राप्त करने के लिए तैयार नहीं होता है, तो यह ऑपरेटर उस एलमेनसेट को अनुक्रम से छोड़ देगा। कोई इसे 0 क्षमता के रूप में सोच सकता है onBackpressureBuffer रणनीति के साथ ON_OVERFLOW_DROP_LATEST ।
यह ऑपरेटर तब उपयोगी होता है जब कोई स्रोत (जैसे माउस चाल या वर्तमान जीपीएस स्थान संकेत) से मूल्यों को सुरक्षित रूप से अनदेखा कर सकता है क्योंकि बाद में और अधिक अप-टू-डेट मान होंगे।
component.mouseMoves()
.onBackpressureDrop()
.observeOn(Schedulers.computation(), 1)
.subscribe(event -> compute(event.x, event.y));
यह स्रोत ऑपरेटर interval() के साथ संयोजन में उपयोगी हो सकता है। उदाहरण के लिए, यदि कोई कुछ आवधिक पृष्ठभूमि कार्य करना चाहता है, लेकिन प्रत्येक पुनरावृत्ति अवधि से अधिक समय तक रह सकता है, तो अतिरिक्त अंतराल अधिसूचना को छोड़ना सुरक्षित है क्योंकि बाद में और अधिक होगा:
Observable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);
इस ऑपरेटर का एक अधिभार मौजूद है: onBackpressureDrop(Action1<? super T> onDrop) जहां मूल्य को गिराए जाने के साथ (साझा) क्रिया कहा जाता है। यह संस्करण स्वयं मूल्यों को साफ करने की अनुमति देता है (उदाहरण के लिए, संबंधित संसाधनों को जारी करना)।
onBackpressureLatest ()
अंतिम संचालक केवल नवीनतम मूल्य रखता है और व्यावहारिक रूप से पुराने, अपरिवर्तित मूल्यों को अधिलेखित करता है। 1 की क्षमता और ON_OVERFLOW_DROP_OLDEST रणनीति के साथ onBackpressureBuffer एक संस्करण के रूप में इस पर विचार कर सकते हैं।
onBackpressureDrop विपरीत, खपत के लिए हमेशा एक मूल्य उपलब्ध होता है यदि डाउनस्ट्रीम पिछड़ रहा हो। यह कुछ टेलीमेट्री जैसी स्थितियों में उपयोगी हो सकता है जहां डेटा कुछ धमाकेदार पैटर्न में आ सकता है लेकिन प्रसंस्करण के लिए केवल बहुत ही नवीनतम दिलचस्प है।
उदाहरण के लिए, यदि उपयोगकर्ता स्क्रीन पर बहुत अधिक क्लिक करता है, तो हम उसके नवीनतम इनपुट पर प्रतिक्रिया करना चाहेंगे।
component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
इस मामले में onBackpressureDrop उपयोग से ऐसी स्थिति उत्पन्न होगी जहां बहुत अंतिम क्लिक ड्रॉप हो जाती है और उपयोगकर्ता को यह सोच कर छोड़ देता है कि व्यापार तर्क निष्पादित क्यों नहीं किया गया।
Backpressured डेटा स्रोत बनाना
सामान्य रूप से बैकप्रेशर से निपटने के दौरान बैकपर्स्ड डेटा स्रोत बनाना अपेक्षाकृत आसान काम है क्योंकि लाइब्रेरी पहले से ही Observable पर स्थिर तरीके प्रदान करती है जो डेवलपर के लिए Observable को संभालती है। हम दो प्रकार के कारखाने तरीकों को भेद कर सकते हैं: ठंड "जनरेटर" जो या तो वापस लौटते हैं और डाउनस्ट्रीम डिमांड और गर्म "पुशर्स" के आधार पर तत्वों को उत्पन्न करते हैं जो आमतौर पर गैर-प्रतिक्रियाशील और / या गैर-बैकस्परेबल डेटा स्रोतों को पुल करते हैं और शीर्ष पर कुछ बैकप्रेशर हैंडलिंग को परत करते हैं उन्हें।
केवल
सबसे बुनियादी backpressure जागरूक स्रोत just के माध्यम से बनाया गया है:
Observable.just(1).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(0);
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
// the rest is omitted for brevity
}
चूंकि हम स्पष्ट रूप से onStart में अनुरोध नहीं करते हैं, इसलिए यह कुछ भी प्रिंट नहीं करेगा। just तब अच्छा होता है जब एक स्थिर मूल्य हो जो हम एक अनुक्रम को जंप करना चाहते हैं।
दुर्भाग्य से, just अक्सर गणना कुछ गतिशील रूप से भस्म किया जाना है के लिए एक रास्ता के लिए गलत है Subscriber रों:
int counter;
int computeValue() {
return ++counter;
}
Observable<Integer> o = Observable.just(computeValue());
o.subscribe(System.out:println);
o.subscribe(System.out:println);
कुछ को आश्चर्यचकित करते हुए, यह क्रमशः 1 और 2 प्रिंट करने के बजाय 1 बार प्रिंट करता है। यदि कॉल फिर से लिखा जाता है, तो यह स्पष्ट हो जाता है कि यह क्यों काम करता है:
int temp = computeValue();
Observable<Integer> o = Observable.just(temp);
computeValue को मुख्य रूटीन का हिस्सा कहा जाता है और सब्सक्राइबर्स की प्रतिक्रिया के रूप में नहीं।
fromCallable
क्या लोगों को वास्तव में जरूरत विधि है fromCallable :
Observable<Integer> o = Observable.fromCallable(() -> computeValue());
यहाँ पर computeValue को तभी निष्पादित किया जाता है जब कोई ग्राहक सब्सक्राइब करता है और उनमें से प्रत्येक के लिए अपेक्षित 1 और 2 की छपाई होती है। स्वाभाविक रूप से, fromCallable भी fromCallable को ठीक से सपोर्ट करता है और जब तक अनुरोध नहीं किया जाता है, तब तक गणना मूल्य का उत्सर्जन नहीं करेगा। हालांकि ध्यान दें कि गणना वैसे भी होती है। यदि डाउनस्ट्रीम वास्तव में अनुरोध करता है, तो मामले में गणना में देरी हो सकती है, हम just map साथ उपयोग कर सकते हैं:
Observable.just("This doesn't matter").map(ignored -> computeValue())...
just अपने निरंतर मूल्य का उत्सर्जन तब तक नहीं किया जाएगा जब तक यह अनुरोध न किया जाए कि यह computeValue के परिणाम के लिए मैप किया गया है, फिर भी प्रत्येक ग्राहक के लिए व्यक्तिगत रूप से कॉल किया जाता है।
से
यदि डेटा पहले से ही वस्तुओं की एक सरणी के रूप में उपलब्ध है, वस्तुओं की सूची या किसी भी स्रोत से संबंधित है, तो अधिभार from संबंधित इस तरह के स्रोतों के Iterable और उत्सर्जन को संभाल लेगा:
Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);
सुविधा के लिए (और जेनेरिक सरणी निर्माण के बारे में चेतावनियों से बचने के लिए) 2 से 10 तर्क ओवरलोड हैं जो just आंतरिक रूप से प्रतिनिधि को देते from ।
from(Iterable) एक दिलचस्प अवसर भी देता है। कई मूल्य पीढ़ी को राज्य-मशीन के रूप में व्यक्त किया जा सकता है। प्रत्येक अनुरोधित तत्व एक राज्य संक्रमण और लौटे मूल्य की गणना को ट्रिगर करता है।
के रूप में ऐसी अवस्था की मशीनों का लेखन Iterable कुछ हद तक जटिल है (लेकिन अभी भी एक लेखन की तुलना में आसान Observable यह लेने के लिए) और विपरीत C #, जावा संकलक से किसी भी समर्थन (साथ बस प्रतिष्ठित लेखन की तलाश में कोड द्वारा इस तरह के राज्य मशीनों का निर्माण करने के लिए नहीं है yield return और yield break )। कुछ पुस्तकालय कुछ सहायता प्रदान करते हैं, जैसे कि Google Guava का AbstractIterable और IxJava का Ix.generate() और Ix.forloop() । ये अपने आप में एक पूरी श्रृंखला के योग्य हैं तो आइए देखते हैं कुछ बहुत ही मूल Iterable Source जो कुछ निरंतर मूल्य को अनिश्चित काल तक दोहराते हैं:
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return 1;
}
};
Observable.from(iterable).take(5).subscribe(System.out::println);
अगर हम क्लासिक फॉर-लूप के माध्यम से iterator उपभोग करेंगे, तो इसका परिणाम अनंत लूप में होगा। चूँकि हम इसका Observable करते हैं, इसलिए हम इसके पहले 5 का उपभोग करने के लिए अपनी इच्छा व्यक्त कर सकते हैं और फिर कुछ भी अनुरोध करना बंद कर सकते हैं। यह Observable s के अंदर आलसी मूल्यांकन और कंप्यूटिंग की सच्ची शक्ति है।
बनाने (SyncOnSubscribe)
कभी-कभी, प्रतिक्रियाशील दुनिया में परिवर्तित होने के लिए डेटा स्रोत समकालिक (अवरुद्ध) और पुल-लाइक होता है, अर्थात, हमें डेटा के अगले टुकड़े को प्राप्त करने के लिए कुछ get या read लिए कॉल करना होगा। एक, निश्चित रूप से, इसे एक Iterable में बदल सकते हैं, लेकिन जब ऐसे स्रोत संसाधनों से जुड़े होते हैं, तो हम उन संसाधनों को लीक कर सकते हैं यदि डाउनस्ट्रीम अनुक्रम को समाप्त होने से पहले ही Iterable देता है।
ऐसे मामलों को संभालने के लिए, RxJava में SyncOnSubscribe class है। कोई भी इसका विस्तार कर सकता है और अपने तरीकों को लागू कर सकता है या उदाहरण के निर्माण के लिए अपने लैम्ब्डा-आधारित कारखाने के तरीकों में से एक का उपयोग कर सकता है।
SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
() -> new FileInputStream("data.bin"),
(inputstream, output) -> {
try {
int byte = inputstream.read();
if (byte < 0) {
output.onCompleted();
} else {
output.onNext(byte);
}
} catch (IOException ex) {
output.onError(ex);
}
return inputstream;
},
inputstream -> {
try {
inputstream.close();
} catch (IOException ex) {
RxJavaHooks.onError(ex);
}
}
);
Observable<Integer> o = Observable.create(binaryReader);
आम तौर पर, SyncOnSubscribe 3 कॉलबैक का उपयोग करता है।
पहला कॉलबैक एक प्रति ग्राहक राज्य बनाने की अनुमति देता है, जैसे कि उदाहरण में FileInputStream ; फ़ाइल प्रत्येक व्यक्तिगत ग्राहक के लिए स्वतंत्र रूप से खोली जाएगी।
दूसरा कॉलबैक इस राज्य वस्तु को लेता है और एक आउटपुट Observer प्रदान करता है जिसके onXXX तरीकों को मूल्यों का उत्सर्जन करने के लिए कहा जा सकता है। इस कॉलबैक को जितनी बार अनुरोध किया गया है, उतनी बार निष्पादित किया जाता है। प्रत्येक मंगलाचरण में, यह कॉल करने के लिए है onNext एक बार ज्यादा से ज्यादा वैकल्पिक रूप से या तो द्वारा पीछा onError या onCompleted । उदाहरण में हम कहते हैं onCompleted() करता है, तो पढ़ने बाइट नकारात्मक है, यह दर्शाता है और फ़ाइल के अंत और कॉल onError मामले पढ़ा एक फेंकता में IOException ।
अंतिम कॉलबैक तब उलटा हो जाता है, जब डाउनस्ट्रीम अनसब्सक्राइब (इनपुटस्ट्रीम को बंद करना) या जब पिछले कॉलबैक को टर्मिनल तरीके कहते हैं; यह संसाधनों को मुक्त करने की अनुमति देता है। चूंकि सभी स्रोतों को इन सभी विशेषताओं की आवश्यकता नहीं है, SyncOnSubscribe के स्थिर तरीके चलो एक को उनके बिना उदाहरण बनाते हैं।
दुर्भाग्य से, जेवीएम और अन्य पुस्तकालयों में कई विधि कॉल की जाँच की गई अपवादों को फेंक दें और इस श्रेणी में उपयोग किए जाने वाले कार्यात्मक इंटरफेस के रूप में try-catch में लिपटे रहने की आवश्यकता है, चेक किए गए अपवादों को फेंकने की अनुमति न दें।
बेशक, हम अन्य विशिष्ट स्रोतों की नकल कर सकते हैं, जैसे कि इसके साथ एक अनबाउंड रेंज:
SyncOnSubscribe.createStateful(
() -> 0,
(current, output) -> {
output.onNext(current);
return current + 1;
},
e -> { }
);
इस सेटअप में, current 0 शुरू होता है और अगली बार लैम्ब्डा के आह्वान पर, पैरामीटर current में 1 ।
वहाँ का एक संस्करण है SyncOnSubscribe बुलाया AsyncOnSubscribe कि अपवाद है कि मध्यम कॉलबैक भी लंबे मूल्य है कि नीचे की ओर और कॉलबैक एक उत्पन्न करनी चाहिए से अनुरोध राशि का प्रतिनिधित्व करता है लेता है के साथ काफी समान दिखता है Observable ठीक उसी लंबाई के साथ। यह स्रोत तब इन सभी Observable को एक क्रम में बदल देता है।
AsyncOnSubscribe.createStateful(
() -> 0,
(state, requested, output) -> {
output.onNext(Observable.range(state, (int)requested));
return state + 1;
},
e -> { }
);
इस वर्ग की उपयोगिता के बारे में चल रही (गर्म) चर्चा है और आमतौर पर अनुशंसित नहीं है क्योंकि यह नियमित रूप से उम्मीदों को तोड़ता है कि यह वास्तव में उन उत्पन्न मूल्यों को कैसे उत्सर्जित करेगा और यह कैसे प्रतिक्रिया देगा, या यहां तक कि यह किस तरह के अनुरोध मूल्यों को प्राप्त करेगा। अधिक जटिल उपभोक्ता परिदृश्य।
बनाने (emitter)
कभी-कभी, एक Observable में लिपटे जाने वाला स्रोत पहले से ही गर्म होता है (जैसे कि माउस चलता है) या ठंडा लेकिन उसके एपीआई में बैकएस्परेबल नहीं है (जैसे कि एसिंक्रोनस नेटवर्क कॉलबैक)।
ऐसे मामलों को संभालने के लिए, RxJava के हालिया संस्करण ने create(emitter) फैक्ट्री विधि की शुरुआत की। यह दो पैरामीटर लेता है:
- कॉलबैक जिसे प्रत्येक आने वाले ग्राहक के लिए
Emitter<T>इंटरफ़ेस के उदाहरण के साथ कहा जाएगा, - एक
Emitter.BackpressureModeगणन जो कि डेवलपर को लागू होने के लिएEmitter.BackpressureModeव्यवहार को निर्दिष्ट करने के लिए अनिवार्य करता है। यह सामान्य मोड, जैसी ही हैonBackpressureXXXएक संकेत के अलावाMissingBackpressureExceptionया बस पूरी तरह इसके अंदर इस तरह के अतिप्रवाह अनदेखी।
ध्यान दें कि यह वर्तमान में उन बैकस्पेसure मोड के लिए अतिरिक्त मापदंडों का समर्थन नहीं करता है। एक उन अनुकूलन की जरूरत है, का उपयोग कर NONE backpressure मोड के रूप में और प्रासंगिक लागू करने onBackpressureXXX जिसके परिणामस्वरूप पर Observable जाने का रास्ता है।
इसके उपयोग के लिए पहला विशिष्ट मामला जब कोई पुश-आधारित स्रोत के साथ बातचीत करना चाहता है, जैसे कि GUI इवेंट। वे APIs addListener - addListener / removeListener कॉल के कुछ रूप removeListener करते हैं जिन्हें कोई भी उपयोग कर सकता है:
Observable.create(emitter -> {
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
emitter.setCancellation(() ->
button.removeListener(al));
}, BackpressureMode.BUFFER);
Emitter का उपयोग करने के लिए अपेक्षाकृत सरल है; एक कॉल कर सकते हैं onNext , onError और onCompleted यह और ऑपरेटर हैंडल backpressure और अपने आप ही सदस्यता खत्म करने के प्रबंधन पर। इसके अलावा, यदि लिपटे एपीआई रद्दीकरण का समर्थन करता है (जैसे कि उदाहरण में श्रोता को हटाना), तो कोई रद्दीकरण कॉलबैक दर्ज करने के लिए setCancellation (या Subscription -प्राप्त संसाधनों के लिए setSubscription ) का उपयोग कर सकता है, जो डाउनस्ट्रीम unsubscribes या onError / पर आने पर onError हो जाता है onCompleted को दिए गए Emitter उदाहरण पर कहा जाता है।
ये विधियाँ एक समय में केवल एक ही संसाधन को उत्सर्जक के साथ जोड़ने की अनुमति देती हैं और एक नया सेट करने पर पुराने को स्वचालित रूप से खोल देती है। एक से अधिक संसाधनों को संभालने के लिए है, तो एक बनाने CompositeSubscription , emitter के साथ संबद्ध और उसके बाद के लिए आगे संसाधनों को जोड़ने CompositeSubscription ही:
Observable.create(emitter -> {
CompositeSubscription cs = new CompositeSubscription();
Worker worker = Schedulers.computation().createWorker();
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
cs.add(worker);
cs.add(Subscriptions.create(() ->
button.removeActionListener(al));
emitter.setSubscription(cs);
}, BackpressureMode.BUFFER);
दूसरे परिदृश्य में आमतौर पर कुछ अतुल्यकालिक, कॉलबैक-आधारित एपीआई शामिल होते हैं जिन्हें एक Observable में Observable ।
Observable.create(emitter -> {
someAPI.remoteCall(new Callback<Data>() {
@Override
public void onSuccess(Data data) {
emitter.onNext(data);
emitter.onCompleted();
}
@Override
public void onFailure(Exception error) {
emitter.onError(error);
}
});
}, BackpressureMode.LATEST);
इस मामले में, प्रतिनिधिमंडल उसी तरह काम करता है। दुर्भाग्य से, आमतौर पर, ये शास्त्रीय कॉलबैक-शैली एपीआई रद्द करने का समर्थन नहीं करते हैं, लेकिन अगर वे करते हैं, तो कोई अपने रद्दकरण को प्रिवियोइज़ उदाहरणों की तरह सेटअप कर सकता है (शायद एक और अधिक शामिल तरीका)। LATEST बैकप्रेशर मोड के उपयोग पर ध्यान दें; अगर हम जानते हैं कि केवल एक ही मूल्य होगा, तो हमें BUFFER रणनीति की आवश्यकता नहीं है क्योंकि यह एक डिफ़ॉल्ट 128 तत्व लंबा बफर आवंटित करता है (जो आवश्यक रूप से बढ़ता है) जो कभी भी पूरी तरह से उपयोग नहीं होने वाला है।