apache-kafka
जावा में निर्माता / उपभोक्ता
खोज…
परिचय
यह विषय दिखाता है कि जावा में रिकॉर्ड का उत्पादन और उपभोग कैसे किया जाता है।
सिंपलकॉंसर (काफ्का> = 0.9.0)
काफ्का की 0.9 रिलीज ने काफ्का उपभोक्ता का एक पूर्ण स्वरूप प्रस्तुत किया। यदि आप पुराने SimpleConsumer (0.8.X) में रुचि रखते हैं, तो इस पृष्ठ पर एक नज़र डालें। यदि आपका काफ्का इंस्टॉलेशन 0.8.X से नया है, तो निम्नलिखित कोड बॉक्स से बाहर काम करने चाहिए।
कॉन्फ़िगरेशन और आरंभीकरण
काफ्का 0.9 अब जावा 6 या स्काला 2.9 का समर्थन नहीं करता है। यदि आप अभी भी जावा 6 पर हैं, तो समर्थित संस्करण पर अपग्रेड करने पर विचार करें।
सबसे पहले, एक मावेन प्रोजेक्ट बनाएं और अपने पोम में निम्न निर्भरता जोड़ें:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
नोट : नवीनतम रिलीज़ के लिए संस्करण फ़ील्ड अपडेट करना न भूलें (अब> 0.10)।
उपभोक्ता को Properties ऑब्जेक्ट का उपयोग करके प्रारंभ किया जाता है। बहुत सारे गुण हैं जो आपको उपभोक्ता व्यवहार को ठीक करने की अनुमति देते हैं। नीचे आवश्यक न्यूनतम कॉन्फ़िगरेशन है:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
bootstrap-servers उपभोक्ता के लिए दलालों की प्रारंभिक सूची है जो क्लस्टर के बाकी हिस्सों की खोज करने में सक्षम है। क्लस्टर में सभी सर्वर होने की आवश्यकता नहीं है: क्लाइंट इस सूची में दलालों से जीवित दलालों का पूरा सेट निर्धारित करेगा।
deserializer उपभोक्ता को बताता है कि मैसेज कुंजियों और मूल्यों की व्याख्या / डिसेरिएलाइज़ कैसे करें। यहां, हम अंतर्निहित StringDeserializer उपयोग करते हैं।
अंत में, group.id इस ग्राहक के उपभोक्ता समूह से मेल खाता है। याद रखें: एक उपभोक्ता समूह के सभी उपभोक्ता उनके बीच संदेशों को विभाजित करेंगे (कफ़्का एक संदेश कतार की तरह काम कर रहे हैं), जबकि विभिन्न उपभोक्ता समूहों के उपभोक्ताओं को एक ही संदेश (प्रकाशन-सदस्यता प्रणाली की तरह कफ़्का अभिनय) मिलेगा।
अन्य उपयोगी गुण हैं:
auto.offset.reset: यदिauto.offset.resetमें संग्रहीत ऑफसेट या तो गायब है या आउट-ऑफ-रेंज है, तो क्या करें। संभावित मूल्यlatestऔरearliest। कुछ और एक अपवाद फेंक देगा;enable.auto.commit: अगरtrue(डिफ़ॉल्ट), उपभोक्ता ऑफसेट समय-समय पर है (देखेंauto.commit.interval.ms) पृष्ठभूमि में बचा लिया। इसेfalseसेट करना औरauto.offset.reset=earliestका उपयोगauto.offset.reset=earliest- यह निर्धारित करना है कि कोई प्रतिबद्ध ऑफसेट जानकारी नहीं मिलने की स्थिति में उपभोक्ता को कहां से शुरू करना चाहिए। असाइन किए गए विषय विभाजन की शुरुआत सेearliestसाधन। विभाजन के लिए उपलब्ध प्रतिबद्ध ऑफसेट की उच्चतम संख्या सेlatestसाधन। हालांकि, काफ्का उपभोक्ता हमेशा अंतिम प्रतिबद्ध ऑफसेट से तब तक फिर से शुरू होगा जब तक कि एक वैध ऑफसेट रिकॉर्ड पाया जाता है (यानीauto.offset.resetनजरअंदाजauto.offset.reset। सबसे अच्छा उदाहरण तब होता है जब कोई नया उपभोक्ता समूह किसी विषय की सदस्यता लेता है। यह तब होता है जब वह इसका उपयोग करता है।auto.offset.resetनिर्धारित करने के लिए शुरुआत में (जल्द से जल्द) या विषय के अंत (नवीनतम) से शुरू करने के लिए है या नहीं।session.timeout.ms: एक सत्र टाइमआउट यह सुनिश्चित करता है कि यदि उपभोक्ता दुर्घटनाग्रस्त हो जाता है या नेटवर्क विभाजन समन्वयक से उपभोक्ता को अलग करता है तो लॉक जारी किया जाएगा। वास्तव में:जब एक उपभोक्ता समूह का हिस्सा होता है, तो प्रत्येक उपभोक्ता को उन विषयों से विभाजन का एक उपसमुच्चय सौंपा जाता है, जिसके लिए उसने सदस्यता ली है। यह मूल रूप से उन विभाजनों पर एक समूह लॉक है। जब तक ताला लगा रहेगा, तब तक समूह का कोई अन्य सदस्य उनसे नहीं पढ़ पाएगा। जब आपका उपभोक्ता स्वस्थ होता है, तो यही आप चाहते हैं। यह एकमात्र तरीका है जिससे आप डुप्लिकेट खपत से बच सकते हैं। लेकिन अगर मशीन या एप्लिकेशन की विफलता के कारण उपभोक्ता की मृत्यु हो जाती है, तो आपको उस लॉक को जारी करने की आवश्यकता होती है ताकि विभाजन एक स्वस्थ सदस्य को सौंपा जा सके। स्रोत
संपत्तियों की पूरी सूची यहां उपलब्ध है http://kafka.apache.org/090/documentation.html#newconsumerconnigs ।
उपभोक्ता निर्माण और विषय सदस्यता
हमारे पास गुण होने के बाद, उपभोक्ता बनाना आसान है:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );
आपके द्वारा सदस्यता लेने के बाद, उपभोक्ता अपना विभाजन असाइनमेंट प्राप्त करने के लिए बाकी समूह के साथ समन्वय कर सकता है। जब आप डेटा का उपभोग करना शुरू करते हैं तो यह सब अपने आप हो जाता है।
मूल मतदान
उपभोक्ता को समानांतर में डेटा प्राप्त करने में सक्षम होने की आवश्यकता है, संभावित रूप से कई ब्रोकरों में फैले कई विषयों के लिए कई विभाजन से। सौभाग्य से, यह सभी स्वचालित रूप से नियंत्रित किया जाता है जब आप डेटा का उपभोग करना शुरू करते हैं। ऐसा करने के लिए, आपको बस एक लूप में कॉल poll करना होगा और उपभोक्ता बाकी को संभाल लेगा।
poll उन विभाजनों से संदेशों का एक (संभवतः खाली) सेट देता है जिन्हें असाइन किया गया था।
while( true ){
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( !records.isEmpty() ){
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
कोड
मूल उदाहरण
यह एक सबसे बुनियादी कोड है जिसका उपयोग आप कक्का विषय से संदेश लाने के लिए कर सकते हैं।
public class ConsumerExample09{
public static void main( String[] args ){
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "auto.offset.reset", "earliest" );
props.put( "enable.auto.commit", "false" );
props.put( "group.id", "octopus" );
try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
consumer.subscribe( Collections.singletonList( "test-topic" ) );
while( true ){
// poll with a 100 ms timeout
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( records.isEmpty() ) continue;
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
}
}
चल उदाहरण
उपभोक्ता को अपने स्वयं के धागे में चलाने के लिए डिज़ाइन किया गया है। यह बाहरी सिंक्रनाइज़ेशन के बिना मल्टीथ्रेडेड उपयोग के लिए सुरक्षित नहीं है और यह कोशिश करने के लिए एक अच्छा विचार नहीं है।
नीचे एक साधारण रन करने योग्य कार्य है जो उपभोक्ता को आरंभ करता है, विषयों की एक सूची की सदस्यता लेता है, और बाहरी रूप से बंद होने तक अनिश्चित काल तक पोल लूप को निष्पादित करता है।
public class ConsumerLoop implements Runnable{
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop( int id, String groupId, List<String> topics ){
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092");
props.put( "group.id", groupId );
props.put( "auto.offset.reset", "earliest" );
props.put( "key.deserializer", StringDeserializer.class.getName() );
props.put( "value.deserializer", StringDeserializer.class.getName() );
this.consumer = new KafkaConsumer<>( props );
}
@Override
public void run(){
try{
consumer.subscribe( topics );
while( true ){
ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}catch( WakeupException e ){
// ignore for shutdown
}finally{
consumer.close();
}
}
public void shutdown(){
consumer.wakeup();
}
}
ध्यान दें कि हम चुनाव के दौरान Long.MAX_VALUE टाइमआउट का उपयोग करते हैं, इसलिए यह एक नए संदेश के लिए अनिश्चित काल तक प्रतीक्षा करेगा। उपभोक्ता को ठीक से बंद करने के लिए, आवेदन समाप्त करने से पहले इसकी shutdown() विधि को कॉल करना महत्वपूर्ण है।
एक ड्राइवर इसे इस तरह इस्तेमाल कर सकता है:
public static void main( String[] args ){
int numConsumers = 3;
String groupId = "octopus";
List<String> topics = Arrays.asList( "test-topic" );
ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
final List<ConsumerLoop> consumers = new ArrayList<>();
for( int i = 0; i < numConsumers; i++ ){
ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
consumers.add( consumer );
executor.submit( consumer );
}
Runtime.getRuntime().addShutdownHook( new Thread(){
@Override
public void run(){
for( ConsumerLoop consumer : consumers ){
consumer.shutdown();
}
executor.shutdown();
try{
executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
}catch( InterruptedException e ){
e.printStackTrace();
}
}
} );
}
सिंपलप्रोड्यूसर (काफ्का> = 0.9)
कॉन्फ़िगरेशन और आरंभीकरण
सबसे पहले, एक मावेन प्रोजेक्ट बनाएं और अपने पोम में निम्न निर्भरता जोड़ें:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Properties ऑब्जेक्ट का उपयोग करके प्रोड्यूसर को इनिशियलाइज़ किया जाता है। बहुत सारे गुण हैं जो आपको निर्माता के व्यवहार को ठीक करने की अनुमति देते हैं। नीचे आवश्यक न्यूनतम कॉन्फ़िगरेशन है:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
bootstrap-servers प्रोड्यूसर के लिए एक या एक से अधिक दलालों की प्रारंभिक सूची है, जो बाकी क्लस्टर की खोज करने में सक्षम है। serializer गुणों ने काफ्का को बताया कि कैसे संदेश कुंजी और मूल्य को एन्कोड किया जाना चाहिए। यहां, हम स्ट्रिंग संदेश भेजेंगे। हालाँकि, इसकी आवश्यकता नहीं है, क्योंकि client.id स्थापना हमेशा से की जाती है: यह आपको ब्रोकर के अनुरोधों को आसानी से client.id करने की अनुमति देता है।
अन्य रोचक गुण हैं:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
आप acks सेटिंग के माध्यम से Kafka को लिखे गए संदेशों के स्थायित्व को नियंत्रित कर सकते हैं। "1" के डिफ़ॉल्ट मान के लिए विभाजन नेता से एक स्पष्ट पावती की आवश्यकता होती है जो लेखन सफल हुआ। कफ़्का द्वारा प्रदान की जाने वाली सबसे मजबूत गारंटी acks=all , जो इस बात की गारंटी देता है कि न केवल विभाजन के नेता ने लेखन को स्वीकार किया था, बल्कि इसे सफलतापूर्वक सभी में सिंक प्रतिकृति के लिए दोहराया गया था। आप थ्रूपुट को अधिकतम करने के लिए "0" के मान का उपयोग कर सकते हैं, लेकिन आपके पास इस बात की कोई गारंटी नहीं होगी कि ब्रोकर के लॉग में संदेश सफलतापूर्वक लिखा गया था क्योंकि ब्रोकर इस मामले में प्रतिक्रिया भी नहीं भेजता है।
यदि विफलता के बाद निर्माता संदेश भेजने का प्रयास करता है, तो retries (डिफ़ॉल्ट से 0 तक) निर्धारित करता है। ध्यान दें कि रिट्रिट्स> 0 के साथ, संदेश का पुन: प्रसारण तब हो सकता है, जब रिट्री निम्नलिखित लिखित सफल होने के बाद हो सकती है।
कफका उत्पादकों ने थ्रूपुट में सुधार करने के लिए भेजे गए संदेशों को बैचों में इकट्ठा करने का प्रयास किया। जावा क्लाइंट के साथ, आप प्रत्येक संदेश बैच के बाइट्स में अधिकतम आकार को नियंत्रित करने के लिए batch.size का उपयोग कर सकते हैं। बैचों को भरने के लिए अधिक समय देने के लिए, आप निर्माता देरी भेजने के लिए linger.ms का उपयोग कर सकते हैं। अंत में, संपीड़न को संपीड़न के साथ सक्षम किया जा सकता है। compression.type सेटिंग।
buffer.memory संदेशों को एकत्रित करने के लिए जावा क्लाइंट के लिए उपलब्ध कुल मेमोरी को सीमित करने के लिए buffer.memory का उपयोग करें। जब यह सीमा प्रभावित होती है, तो निर्माता अपवाद max.block.ms से पहले max.block.ms से अधिक भेज देता है। इसके अतिरिक्त, रिकॉर्ड को अनिश्चित काल तक रखने से बचने के लिए, आप request.timeout.ms का उपयोग करके टाइमआउट सेट कर सकते हैं।
संपत्तियों की पूरी सूची यहां उपलब्ध है । मेरा सुझाव है कि अधिक जानकारी के लिए इस लेख को कंफ्लुएंट से पढ़ें।
संदेश भेजना
send() विधि अतुल्यकालिक है। जब इसे कहा जाता है तो यह रिकॉर्ड को लंबित रिकॉर्ड भेजने और तुरंत वापस भेजने के बफर में जोड़ देता है। यह निर्माता को दक्षता के लिए व्यक्तिगत रिकॉर्ड को एक साथ बैचने की अनुमति देता है।
भेजने का परिणाम एक RecordMetadata जिसमें यह निर्दिष्ट किया जाता है कि विभाजन को रिकॉर्ड भेजा गया था और ऑफसेट को सौंपा गया था। चूँकि कॉल कॉल अतुल्यकालिक है इसलिए यह RecordMetadata के लिए Future देता है जिसे इस रिकॉर्ड को सौंपा जाएगा। मेटाडेटा से परामर्श करने के लिए, आप या तो कॉल get() कर सकते हैं, जो अनुरोध पूरा होने तक या कॉलबैक का उपयोग करने तक रोक देगा।
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
कोड
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}