Szukaj…


Wprowadzenie

W tym temacie pokazano, jak tworzyć i konsumować rekordy w Javie.

SimpleConsumer (Kafka> = 0.9.0)

Wersja Kafka w wersji 0.9 wprowadziła całkowitą przeprojektowanie konsumenta kafki. Jeśli jesteś zainteresowany starym SimpleConsumer (0.8.X), spójrz na tę stronę . Jeśli Twoja instalacja Kafka jest nowsza niż 0.8.X, poniższe kody powinny działać od razu po wyjęciu z pudełka.

Konfiguracja i inicjalizacja

Kafka 0.9 nie obsługuje już Java 6 ani Scala 2.9. Jeśli nadal korzystasz z Java 6, rozważ uaktualnienie do obsługiwanej wersji.

Najpierw utwórz projekt maven i dodaj do swojej pom następujące zależności:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

Uwaga : nie zapomnij zaktualizować pola wersji dla najnowszych wydań (teraz> 0.10).

Konsument jest inicjowany przy użyciu obiektu Properties . Istnieje wiele właściwości umożliwiających precyzyjne dostosowanie zachowań konsumentów. Poniżej znajduje się minimalna wymagana konfiguracja:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers to początkowa lista brokerów, dzięki której konsument może odkryć resztę klastra. Nie muszą to być wszystkie serwery w klastrze: klient określi pełny zestaw żywych brokerów od brokerów z tej listy.

deserializer mówi konsumentowi, jak interpretować / deserializować klucze i wartości wiadomości. Tutaj używamy wbudowanego StringDeserializer .

Wreszcie group.id odpowiada grupie konsumentów tego klienta. Pamiętaj: wszyscy konsumenci z grupy konsumentów podzielą między siebie wiadomości (kafka zachowuje się jak kolejka wiadomości), podczas gdy konsumenci z różnych grup konsumentów otrzymają te same wiadomości (kafka działa jak system publikowania i subskrybowania).

Inne przydatne właściwości to:

  • auto.offset.reset : kontroluje, co zrobić, jeśli przesunięcie zapisane w Zookeeper jest niedostępne lub poza zakresem. Możliwe wartości są latest i earliest . Wszystko inne rzuci wyjątek;

  • enable.auto.commit : jeśli true (domyślnie), przesunięcie konsumenta jest okresowo (patrz auto.commit.interval.ms ) zapisywane w tle. Ustawienie wartości false i użycie parametru auto.offset.reset=earliest - służy do określenia, od czego konsument powinien zacząć, na wypadek, gdyby nie znaleziono informacji o popełnionym przesunięciu. earliest środki od początku przypisanej partycji tematycznej. latest środki z najwyższej liczby dostępnych przesunięć zatwierdzonych dla partycji. Jednak konsument Kafka zawsze wznawia od ostatniego zatwierdzonego przesunięcia, o ile zostanie znaleziony prawidłowy rekord przesunięcia (tj. auto.offset.reset . Najlepszym przykładem jest sytuacja, gdy zupełnie nowa grupa konsumentów subskrybuje dany temat. To wtedy używa auto.offset.reset aby określić, czy zacząć od początku (najwcześniej), czy od końca (najpóźniej) tematu.

  • session.timeout.ms : limit czasu sesji gwarantuje, że blokada zostanie zwolniona, jeśli konsument ulegnie awarii lub jeśli partycja sieciowa oddzieli konsumenta od koordynatora. W rzeczy samej:

    Będąc częścią grupy konsumentów, każdemu konsumentowi przypisany jest podzbiór partycji z tematów, które subskrybuje. Jest to w zasadzie blokada grupy na tych partycjach. Dopóki blokada jest utrzymywana, żaden inny członek grupy nie będzie mógł ich odczytać. Kiedy twój konsument jest zdrowy, właśnie tego chcesz. To jedyny sposób, aby uniknąć podwójnego zużycia. Ale jeśli konsument umrze z powodu awarii komputera lub aplikacji, musisz zwolnić tę blokadę, aby partycje mogły zostać przypisane do zdrowego członka. źródło

Pełna lista właściwości jest dostępna tutaj http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Tworzenie konsumentów i subskrypcja tematów

Gdy mamy już właściwości, tworzenie konsumenta jest łatwe:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

Po dokonaniu subskrypcji konsument może koordynować z resztą grupy, aby uzyskać przypisanie do partycji. Wszystko to jest obsługiwane automatycznie po rozpoczęciu korzystania z danych.

Ankieta podstawowa

Konsument musi mieć możliwość równoległego pobierania danych, potencjalnie z wielu partycji dla wielu tematów, prawdopodobnie rozrzuconych wśród wielu brokerów. Na szczęście wszystko jest obsługiwane automatycznie po rozpoczęciu korzystania z danych. Aby to zrobić, wystarczy wykonać poll w pętli, a konsument zajmie się resztą.

poll zwraca (prawdopodobnie pusty) zestaw wiadomości z przypisanych partycji.

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

Kod

Podstawowy przykład

Jest to najbardziej podstawowy kod, którego można użyć do pobierania wiadomości z tematu kafka.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

Przykład możliwy do uruchomienia

Konsument został zaprojektowany do działania we własnym wątku. Nie jest bezpieczny do użytku wielowątkowego bez zewnętrznej synchronizacji i prawdopodobnie nie jest dobrym pomysłem.

Poniżej znajduje się proste zadanie Runnable, które inicjuje konsumenta, subskrybuje listę tematów i wykonuje pętlę ankiety w nieskończoność, aż do zamknięcia na zewnątrz.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

Pamiętaj, że podczas ankiety używamy limitu czasu Long.MAX_VALUE , więc będzie on czekać w nieskończoność na nową wiadomość. Aby poprawnie zamknąć klienta, ważne jest, aby wywołać jego metodę shutdown() przed zakończeniem aplikacji.

Sterownik może użyć tego w następujący sposób:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0,9)

Konfiguracja i inicjalizacja

Najpierw utwórz projekt maven i dodaj do swojej pom następujące zależności:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

Producent jest inicjowany za pomocą obiektu Properties . Istnieje wiele właściwości pozwalających dostroić zachowanie producenta. Poniżej znajduje się minimalna wymagana konfiguracja:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers to początkowa lista jednego lub więcej brokerów, aby producent mógł odkryć resztę klastra. Właściwości serializer mówią Kafce, w jaki sposób należy kodować klucz i wartość komunikatu. Tutaj wyślemy wiadomości tekstowe. Chociaż nie jest to wymagane, ustawienie client.id ponieważ jest zawsze zalecane: pozwala to łatwo skorelować żądania brokera z instancją klienta, która go utworzyła.

Inne interesujące właściwości to:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

Możesz kontrolować trwałość wiadomości wysyłanych do Kafki poprzez ustawienie acks . Domyślna wartość „1” wymaga wyraźnego potwierdzenia od lidera partycji, że zapis się powiódł. Najsilniejszą gwarancją, jaką zapewnia Kafka, jest acks=all , która gwarantuje, że lider partycji nie tylko zaakceptował zapis, ale został pomyślnie zreplikowany do wszystkich replik synchronicznych. Możesz także użyć wartości „0”, aby zmaksymalizować przepustowość, ale nie masz gwarancji, że wiadomość została pomyślnie zapisana w dzienniku brokera, ponieważ w tym przypadku broker nawet nie wysyła odpowiedzi.

retries próba (domyślnie> 0) określa, czy producent próbuje ponownie wysłać wiadomość po awarii. Należy pamiętać, że przy ponownych próbach> 0 może wystąpić zmiana kolejności wiadomości, ponieważ ponowna próba może nastąpić po pomyślnym zapisaniu następnego zapisu.

Producenci Kafka próbują zbierać wysyłane wiadomości w partie, aby poprawić przepustowość. Za pomocą klienta Java można użyć batch.size aby kontrolować maksymalny rozmiar w bajtach każdej partii komunikatów. Aby dać więcej czasu na wypełnienie partii, możesz użyć linger.ms aby opóźnić wysłanie przez producenta. Wreszcie kompresję można włączyć za compression.type ustawienia compression.type .

Użyj buffer.memory aby ograniczyć całkowitą pamięć dostępną dla klienta Java do zbierania niewysłanych wiadomości. Po osiągnięciu tego limitu producent będzie blokował dodatkowe max.block.ms tak długo, jak max.block.ms zanim max.block.ms wyjątek. Ponadto, aby uniknąć przechowywania rekordów w kolejce przez czas nieokreślony, możesz ustawić limit czasu za pomocą request.timeout.ms .

Pełna lista właściwości jest dostępna tutaj . Sugeruję przeczytać ten artykuł z Confluent, aby uzyskać więcej informacji.

Wysyłanie wiadomości

Metoda send() jest asynchroniczna. Po wywołaniu dodaje rekord do bufora oczekujących wysyłek rekordów i natychmiast wraca. Umożliwia to producentowi grupowanie pojedynczych rekordów pod kątem wydajności.

Wynikiem wysyłania jest RecordMetadata określający partycję, do której wysłano rekord i przesunięcie, do którego został przypisany. Ponieważ wywołanie Send jest asynchroniczne, zwraca Future dla RecordMetadata, które zostaną przypisane do tego rekordu. Aby przejrzeć metadane, możesz wywołać funkcję get() , która będzie blokować do momentu zakończenia żądania lub użyć funkcji zwrotnej.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

Kod

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow