खोज…


पैरामीटर

पैरामीटर विवरण
group.id उपभोक्ता समूह का नाम।
enable.auto.commit स्वचालित रूप से ऑफसेट; डिफ़ॉल्ट: सच
auto.commit.interval.ms कमिट के बीच मिलीसेकंड में न्यूनतम देरी (सक्षम होना आवश्यक है। enable.auto.commit=true ); डिफ़ॉल्ट: 5000
auto.offset.reset जब कोई वैध प्रतिबद्ध ऑफसेट नहीं मिला तो क्या करें; डिफ़ॉल्ट: नवीनतम ! +)
(+) संभावित मान विवरण
जल्द से जल्द स्वचालित रूप से ऑफसेट को जल्द से जल्द ऑफसेट पर रीसेट करें।
नवीनतम स्वचालित रूप से नवीनतम ऑफसेट को ऑफसेट रीसेट करें।
कोई नहीं यदि उपभोक्ता के समूह के लिए कोई पिछली ऑफसेट नहीं मिली है तो उपभोक्ता को छोड़ दें।
और कुछ उपभोक्ता को छोड़ दें।

कंज्यूमर ग्रुप क्या है

काफ्का 0.9 के रूप में, नया उच्च स्तरीय काफ्काकोसमेर ग्राहक उपलब्ध है। यह एक नए अंतर्निहित Kafka प्रोटोकॉल का फायदा उठाता है जो एक तथाकथित उपभोक्ता समूह में कई उपभोक्ताओं को संयोजित करने की अनुमति देता है। एक उपभोक्ता समूह को एक एकल तार्किक उपभोक्ता के रूप में वर्णित किया जा सकता है जो विषयों के एक सेट की सदस्यता लेता है। सभी विषयों पर विभाजन समूह के भीतर भौतिक उपभोक्ताओं के लिए आत्मसमर्पण कर रहे हैं, जैसे कि प्रत्येक पेटेंट को एक उपभोक्ता को छूट देने के लिए सौंपा गया है (एक एकल उपभोक्ता को कई विभाजन सौंपे जा सकते हैं)। एक ही समूह से संबंधित अविभाज्य उपभोक्ता अलग-अलग मेजबानों पर वितरित तरीके से चल सकते हैं।

उपभोक्ता समूहों की पहचान उनके group.id माध्यम से की group.id । एक कंज्यूमर ग्रुप की एक विशिष्ट ग्राहक उदाहरण सदस्य बनाने के लिए, यह समूह भी निर्दिष्ट करने के लिए पर्याप्त है group.id इस ग्राहक के लिए, ग्राहक के विन्यास के माध्यम से:

Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);

इस प्रकार, सभी उपभोक्ताओं है कि एक ही काफ्का क्लस्टर से कनेक्ट और एक ही का उपयोग group.id प्रपत्र एक कंज्यूमर ग्रुप। उपभोक्ता किसी भी समय एक समूह छोड़ सकते हैं और नए उपभोक्ता किसी भी समय एक समूह में शामिल हो सकते हैं। दोनों ही मामलों के लिए, एक तथाकथित असंतुलन शुरू हो जाता है और विभाजन को उपभोक्ता समूह के साथ आश्वस्त किया जाता है ताकि यह सुनिश्चित हो सके कि प्रत्येक विभाजन को समूह के भीतर एक उपभोक्ता द्वारा निर्बाध रूप से संसाधित किया जाता है।

ध्यान दें, कि एक भी KafkaConsumer एकल सदस्य के रूप में खुद के साथ एक उपभोक्ता समूह बनाता है।

उपभोक्ता ऑफसेट प्रबंधन और दोष-सहिष्णुता

KafkaConsumers एक काफका ब्रोकर से poll() माध्यम से संदेश का अनुरोध करते हैं और उनकी प्रगति को ऑफसेट के माध्यम से ट्रैक किया जाता है । प्रत्येक विषय के प्रत्येक विभाजन के भीतर प्रत्येक संदेश में एक तथाकथित ऑफसेट-विभाजन के भीतर उसकी तार्किक अनुक्रम संख्या होती है। एक KafkaConsumer प्रत्येक विभाजन के लिए अपने वर्तमान ऑफसेट को ट्रैक करता है जो इसे सौंपा गया है। ध्यान दें, कफका दलालों को उपभोक्ताओं के वर्तमान ऑफसेट के बारे में पता नहीं है। इस प्रकार, poll() पर poll() उपभोक्ता को अपने वर्तमान ऑफसेट को दलाल को भेजने की आवश्यकता होती है, जैसे कि दलाल संबंधित संदेशों को वापस कर सकता है, अर्थात। लगातार बड़े ऑफसेट वाले संदेश। उदाहरण के लिए, मान लें कि हमारे पास एक एकल विभाजन विषय है और वर्तमान ऑफसेट 5 के साथ एक एकल उपभोक्ता है। poll() उपभोक्ता भेजता है अगर दलाल को ऑफसेट करता है और दलाल 6,7,8 के लिए ऑफ़सेट के लिए संदेश लौटाता है, ...

क्योंकि उपभोक्ता अपने ऑफ़सेट को स्वयं ट्रैक करते हैं, यदि उपभोक्ता विफल हो जाता है तो यह जानकारी खो सकती है। इस प्रकार, ऑफ़सेट को मज़बूती से संग्रहीत किया जाना चाहिए, जैसे कि पुनः आरंभ करने पर, एक उपभोक्ता अपनी पुरानी ऑफसेट और फिर से शुरू करने वाले को छोड़ सकता है जहां से छोड़ा गया है। काफ्का में, ऑफसेट कमिट्स के माध्यम से इसके लिए अंतर्निहित समर्थन है । नई KafkaConsumer Kafka और Kafka के लिए अपने वर्तमान ऑफसेट को एक विशेष विषय __consumer_offsets नामक एक विशेष विषय में स्टोर कर __consumer_offsets । एक काफ़्का विषय के भीतर ऑफ़सेट को संग्रहीत करना केवल दोष-सहिष्णु नहीं है, बल्कि एक रिबैलेंस के दौरान अन्य उपभोक्ताओं को विभाजन को फिर से असाइन करने की अनुमति देता है। क्योंकि एक उपभोक्ता समूह के सभी उपभोक्ता सभी विभाजन के सभी प्रतिबद्ध ऑफसेट का उपयोग कर सकते हैं, पुनर्संतुलन पर, एक उपभोक्ता जिसे एक नया विभाजन सौंपा जाता है, वह इस विभाजन की प्रतिबद्ध ऑफसेट __consumer_offsets विषय से पढ़ता है और जहां पुराना उपभोक्ता शेष रहता है।

ऑफ़सेट कैसे करें

KafkaConsumers पृष्ठभूमि में स्वचालित रूप से ऑफ़सेट कर सकते हैं (कॉन्फ़िगरेशन पैरामीटर enable.auto.commit = true ) डिफ़ॉल्ट सेटिंग क्या है। उन ऑटो कॉम्पट को poll() भीतर किया जाता है ( जिसे आमतौर पर लूप में कहा जाता है )। कितनी बार ऑफ़सेट किया जाना चाहिए, इसे auto.commit.interval.ms माध्यम से कॉन्फ़िगर किया जा सकता है। क्योंकि, ऑटो कमिट poll() और poll() में एम्बेडेड होते हैं poll() उपयोगकर्ता कोड द्वारा कहा जाता है, यह पैरामीटर इंटर-कमिट-इंटरवल के लिए कम बाउंड को परिभाषित करता है।

ऑटो कमिट के विकल्प के रूप में, ऑफ़सेट को मैन्युअल रूप से भी प्रबंधित किया जा सकता है। इसके लिए, ऑटो कमिट अक्षम होना चाहिए ( enable.auto.commit = false )। मैन्युअल KafkaConsumers लिए KafkaConsumers दो विधियाँ प्रदान करता है, जिनका नाम है कमिटसिंक () और कमिटसिंक () । जैसा कि नाम से संकेत मिलता है, commitSync() एक ब्लॉकिंग कॉल है, जो ऑफसेट सफलतापूर्वक होने के बाद वापस लौटता है, जबकि commitAsync() तुरंत लौटता है। यदि आप यह जानना चाहते हैं कि क्या कोई वचन सफल हुआ था या नहीं, तो आप विधि पैरामीटर को कॉल बैक हैंडलर ( OffsetCommitCallback ) प्रदान कर सकते हैं। ध्यान दें, दोनों प्रतिबद्ध कॉलों में, उपभोक्ता नवीनतम poll() कॉल के ऑफसेट करता है। उदाहरण के लिए। हमें एक एकल उपभोक्ता के साथ एक विभाजन विषय और poll() लिए अंतिम कॉल poll() ऑफसेट 4,5,6 के साथ संदेश लौटाएं। प्रतिबद्ध होने पर, ऑफसेट 6 प्रतिबद्ध होगा क्योंकि यह उपभोक्ता क्लाइंट द्वारा ट्रैक की गई नवीनतम ऑफसेट है। एक ही समय में, commitSync() और commitAsync() दोनों अधिक नियंत्रण के लिए अनुमति देते हैं कि आप क्या ऑफसेट करना चाहते हैं: यदि आप संबंधित ओवरलोड का उपयोग करते हैं जो आपको एक Map<TopicPartition, OffsetAndMetadata> निर्दिष्ट करने की अनुमति देता है (यानी, नक्शे में निर्दिष्ट विभाजन का कोई सबसेट हो सकता है, और निर्दिष्ट ऑफसेट का कोई भी मूल्य हो सकता है)।

प्रतिबद्ध संतानों का शब्दार्थ

एक प्रतिबद्ध ऑफ़सेट इंगित करता है, कि इस ऑफ़सेट तक के सभी संदेश पहले ही संसाधित हो चुके हैं। इस प्रकार, जैसा कि ऑफसेट लगातार संख्याएं हैं, ऑफसेट X कम करने से स्पष्ट रूप से सभी ऑफसेट X से छोटे हो जाते हैं। इसलिए, प्रत्येक ऑफसेट को व्यक्तिगत रूप से करना आवश्यक नहीं है, और एक ही बार में कई ऑफ़सेट करना होता है, लेकिन केवल सबसे बड़ी ऑफ़सेट करना।

ध्यान दें, कि डिजाइन द्वारा अंतिम प्रतिबद्ध ऑफसेट की तुलना में एक छोटी ऑफसेट करना संभव है। यह किया जा सकता है, अगर संदेशों को दूसरी बार पढ़ा जाना चाहिए।

प्रसंस्करण की गारंटी देता है

ऑटो कमेट का उपयोग कम से कम एक बार प्रसंस्करण शब्दार्थ प्रदान करता है। अंतर्निहित धारणा यह है कि poll() केवल तभी कहा जाता है जब सभी पहले वितरित संदेशों को सफलतापूर्वक संसाधित किया गया हो। यह सुनिश्चित करता है, कि कोई संदेश नष्ट न हो क्योंकि प्रसंस्करण के बाद एक प्रतिबद्धता होती है। यदि कोई उपभोक्ता एक प्रतिबद्ध होने से पहले विफल हो जाता है, तो अंतिम प्रतिबद्ध के बाद सभी संदेश कफका से प्राप्त होते हैं और फिर से संसाधित होते हैं। हालाँकि, इस पुनरावृत्ति का परिणाम डुप्लिकेट हो सकता है, क्योंकि पिछले poll() कॉल से कुछ संदेश संसाधित हो सकते हैं, लेकिन विफलता ऑटो कमिट कॉल से ठीक पहले हुई।

यदि अधिकांश-एक बार प्रसंस्करण शब्दार्थों की आवश्यकता होती है, तो ऑटो कमिट को अक्षम किया जाना चाहिए और poll() बाद सीधे एक मैनुअल commitSync() किया जाना चाहिए। बाद में, संदेश संसाधित हो जाते हैं। यह सुनिश्चित करता है, कि संसाधित होने से पहले संदेश प्रतिबद्ध हैं और इस प्रकार दूसरी बार कभी नहीं पढ़ते हैं। बेशक, विफलता के मामले में कुछ संदेश खो सकता है।

मैं अपनी शुरुआत से टॉपिक कैसे पढ़ सकता हूं

किसी विषय को शुरू से पढ़ने के लिए कई रणनीतियाँ होती हैं। उन लोगों को समझाने के लिए, हमें पहले यह समझना होगा कि उपभोक्ता स्टार्टअप में क्या होता है। एक उपभोक्ता के स्टार्टअप पर, निम्नलिखित होता है:

  1. कॉन्फ़िगर किए गए उपभोक्ता समूह में शामिल हों, जो एक असंतुलन को ट्रिगर करता है और उपभोक्ता को विभाजन प्रदान करता है
  2. प्रतिबद्ध ऑफसेट की तलाश करें (सभी विभाजनों के लिए जो उपभोक्ता को सौंपा गया है)
  3. मान्य ऑफ़सेट के साथ सभी विभाजनों के लिए, इस ऑफ़सेट से फिर से शुरू करें
  4. मान्य ऑफसेट के साथ सभी विभाजनों के लिए, auto.offset.reset कॉन्फ़िगरेशन पैरामीटर के अनुसार ऑफ़सेट प्रारंभ करें

एक नया उपभोक्ता समूह शुरू करें

आप उसकी शुरूआत से एक विषय पर कार्रवाई करना चाहते हैं, तो आप सरल एक नया उपभोक्ता समूह शुरू कर सकते हैं (यानी, एक अप्रयुक्त चुनें group.id और सेट) auto.offset.reset = earliest । क्योंकि एक नए समूह के लिए कोई प्रतिबद्ध ऑफ़सेट नहीं हैं, इसलिए ऑटो ऑफ़सेट रीसेट ट्रिगर हो जाएगा और इस विषय की शुरुआत से खपत होगी। ध्यान दें, कि उपभोक्ता पुनरारंभ पर, यदि आप एक ही group.id उपयोग करते हैं। फिर से, यह फिर से शुरुआत से विषय को नहीं पढ़ेगा, लेकिन इसे फिर से शुरू करें जहां इसे छोड़ दिया है। इस प्रकार, इस रणनीति के लिए, आपको एक नया group.id आवंटित करना होगा। हर बार जब आप किसी विषय को उसकी शुरुआत से पढ़ना चाहते हैं।

समान ग्रुप आईडी का पुन: उपयोग करें

नया group.id सेट करने से बचने के लिए। प्रत्येक बार जब आप किसी विषय को उसकी शुरुआत से पढ़ना चाहते हैं, तो आप उपभोक्ता को पहली बार (अप्रयुक्त group.id का उपयोग करके) शुरू करने से पहले ऑटो कमिट ( enable.auto.commit = false माध्यम से) को निष्क्रिय कर सकते हैं group.id और सेटिंग auto.offset.reset = earliest )। इसके अतिरिक्त, आपको मैन्युअल रूप से कोई ऑफ़सेट नहीं करना चाहिए। क्योंकि ऑफ़सेट इस रणनीति का उपयोग करने के लिए प्रतिबद्ध नहीं हैं, फिर से शुरू होने पर, उपभोक्ता अपनी शुरुआत से विषय को फिर से पढ़ेगा।

हालाँकि, इस रणनीति के दो नुकसान हैं:

  1. यह गलती-सहिष्णु नहीं है
  2. समूह असंतुलन के अनुसार काम नहीं करता है

(1) क्योंकि ऑफ़सेट कभी प्रतिबद्ध नहीं होते हैं, एक विफल और रोका गया उपभोक्ता पुनः आरंभ करने पर उसी तरह से नियंत्रित होता है। दोनों ही मामलों के लिए, विषय की शुरुआत से ही खपत होगी। (2) क्योंकि ऑफसेट कभी भी प्रतिबद्ध नहीं होते हैं, पुनर्संतुलन पर नए असाइन किए गए विभाजन शुरू से ही उपभोक्ता होंगे।

इसलिए, यह रणनीति केवल एकल उपभोक्ता के साथ उपभोक्ता समूहों के लिए काम करती है और इसका उपयोग केवल विकास के उद्देश्य के लिए किया जाना चाहिए।

एक ही ग्रुप आईडी और प्रतिबद्ध का पुन: उपयोग करें

यदि आप अपने उपभोक्ता समूह में कई उपभोक्ताओं को दोष-सहिष्णु और / या उपयोग करना चाहते हैं, तो अपराध करना अनिवार्य है। इस प्रकार, यदि आप इसकी शुरुआत से किसी विषय को पढ़ना चाहते हैं, तो आपको उपभोक्ता स्टार्टअप पर प्रतिबद्ध ऑफसेट में हेरफेर करने की आवश्यकता है। इसके लिए, KafkaConsumer तीन तरीकों की seek() , seekToBeginning() , और seekToEnd() । जबकि seek() का उपयोग मनमाने ढंग से ऑफसेट सेट करने के लिए किया जा सकता है, दूसरे और तीसरे तरीके का उपयोग क्रमशः विभाजन की शुरुआत या अंत की तलाश में किया जा सकता है। इस प्रकार, विफलता पर और उपभोक्ता पुनः आरंभ करने पर मांग को छोड़ दिया जाएगा और उपभोक्ता इसे फिर से शुरू कर सकता है जहां से यह छोड़ दिया गया है। उपभोक्ता-स्टॉप-एंड-रिस्टार्ट-से-शुरुआत के लिए, seekToBeginning() आपके poll() लूप दर्ज करने से पहले स्पष्ट रूप से कहा जाएगा। ध्यान दें, कि एक ग्राहक एक समूह में शामिल होने के बाद ही seekXXX() उपयोग किया जा सकता है - इस प्रकार, seekXXX() का उपयोग करने से पहले "डमी-पोल" करना आवश्यक है। समग्र कोड कुछ इस तरह होगा:

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}


Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow