Suche…


Einführung

In diesem Thema wird gezeigt, wie Datensätze in Java erstellt und verwendet werden.

SimpleConsumer (Kafka> = 0.9.0)

Mit der 0.9-Version von Kafka wurde ein komplett neues Design des Kafka-Konsumenten eingeführt. Wenn Sie sich für den alten SimpleConsumer (0.8.X) interessieren, SimpleConsumer Sie diese Seite . Wenn Ihre Kafka-Installation neuer als 0.8.X ist, sollten die folgenden Codes sofort funktionieren.

Konfiguration und Initialisierung

Kafka 0.9 unterstützt Java 6 oder Scala 2.9 nicht mehr. Wenn Sie noch Java 6 verwenden, sollten Sie ein Upgrade auf eine unterstützte Version in Betracht ziehen.

Erstellen Sie zunächst ein Maven-Projekt und fügen Sie Ihrem Pom folgende Abhängigkeit hinzu:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

Hinweis : Vergessen Sie nicht, das Versionsfeld für die neuesten Versionen (jetzt> 0.10) zu aktualisieren.

Der Consumer wird mit einem Properties Objekt initialisiert. Es gibt viele Eigenschaften, mit denen Sie das Verbraucherverhalten genau einstellen können. Unten ist die minimale Konfiguration erforderlich:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

Bei den bootstrap-servers handelt es sich um eine erste Liste von Brokern, die der Konsument den Rest des Clusters ermitteln kann. Dies müssen nicht alle Server im Cluster sein: Der Client ermittelt die vollständige Anzahl der aktiven Broker aus den Brokern in dieser Liste.

Der deserializer teilt dem Benutzer mit, wie er die Nachrichtenschlüssel und -werte interpretieren / deserialisieren soll. Hier verwenden wir den integrierten StringDeserializer .

Schließlich entspricht die group.id der Verbrauchergruppe dieses Clients. Denken Sie daran: Alle Verbraucher einer Verbrauchergruppe teilen Nachrichten auf (kafka, die sich wie eine Nachrichtenwarteschlange verhalten), während Verbraucher aus verschiedenen Verbrauchergruppen dieselben Meldungen erhalten (kafka wie ein Publish-Subscribe-System).

Andere nützliche Eigenschaften sind:

  • auto.offset.reset : auto.offset.reset was zu tun ist, wenn der in Zookeeper gespeicherte Offset fehlt oder außerhalb des Bereichs liegt. Mögliche Werte sind latest und earliest . Alles andere wird eine Ausnahme auslösen.

  • enable.auto.commit : Wenn true (Standard), wird der Verbraucherversatz periodisch (siehe auto.commit.interval.ms ) im Hintergrund gespeichert. Wenn Sie den auto.offset.reset=earliest auf false und auto.offset.reset=earliest , können Sie festlegen, von wo aus der Consumer starten soll, falls keine festgeschriebenen Offset-Informationen gefunden werden. earliest bedeutet vom Beginn der zugewiesenen Themenpartition. latest Mittelwert aus der höchsten Anzahl verfügbarer festgeschriebener Offsets für die Partition. Der Kafka-Consumer wird jedoch immer vom letzten festgeschriebenen Offset auto.offset.reset solange ein gültiger Offset-Datensatz gefunden wird (dh auto.offset.reset ignoriert. Das beste Beispiel ist, wenn eine brandneue Consumer-Gruppe ein Thema abonniert. Dies ist der auto.offset.reset in dem sie verwendet wird auto.offset.reset , um zu bestimmen, ob am Anfang (frühesten) oder am Ende (neuesten) des Themas begonnen werden soll.

  • session.timeout.ms : Durch ein Sitzungszeitlimit wird sichergestellt, dass die Sperre session.timeout.ms wird, wenn der Verbraucher abstürzt oder wenn eine Netzwerkpartition den Verbraucher vom Koordinator isoliert. Tatsächlich:

    Wenn er zu einer Verbrauchergruppe gehört, wird jedem Verbraucher eine Teilmenge der Partitionen aus Themen zugewiesen, die er abonniert hat. Dies ist im Grunde eine Gruppensperre für diese Partitionen. Solange die Sperre gehalten wird, kann kein anderes Mitglied der Gruppe von ihnen lesen. Wenn Ihr Verbraucher gesund ist, ist dies genau das, was Sie wollen. Nur so können Sie doppelten Verbrauch vermeiden. Wenn der Consumer jedoch aufgrund eines Maschinen- oder Anwendungsfehlers stirbt, müssen Sie die Sperre aufheben, damit die Partitionen einem fehlerhaften Mitglied zugewiesen werden können. Quelle

Die vollständige Liste der Eigenschaften finden Sie hier http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Erstellung von Verbrauchern und Themenabonnement

Sobald wir die Eigenschaften haben, ist das Erstellen eines Verbrauchers einfach:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

Nach dem Abonnieren kann der Consumer mit dem Rest der Gruppe koordinieren, um die Partitionszuordnung zu erhalten. Dies wird automatisch erledigt, wenn Sie mit dem Datenverbrauch beginnen.

Grundlegende Umfrage

Der Verbraucher muss in der Lage sein, Daten parallel abzurufen, möglicherweise aus vielen Partitionen für viele Themen, die sich wahrscheinlich auf viele Broker erstrecken. Glücklicherweise wird dies alles automatisch erledigt, wenn Sie mit dem Datenverbrauch beginnen. Dazu müssen Sie poll eine poll in einer Schleife aufrufen, und der Verbraucher kümmert sich um den Rest.

poll gibt einen (möglicherweise leeren) Satz von Nachrichten aus den zugewiesenen Partitionen zurück.

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

Der Code

Grundlegendes Beispiel

Dies ist der einfachste Code, mit dem Sie Nachrichten aus einem Kafka-Thema abrufen können.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

Laufbares Beispiel

Der Consumer ist dafür ausgelegt, in einem eigenen Thread ausgeführt zu werden. Es ist nicht sicher für die Verwendung von Multithreading ohne externe Synchronisierung, und es ist wahrscheinlich keine gute Idee, es auszuprobieren.

Nachfolgend finden Sie eine einfache ausführbare Task, die den Verbraucher initialisiert, eine Liste von Themen abonniert und die Abfrageschleife unbegrenzt ausführt, bis sie extern heruntergefahren wird.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

Beachten Sie, dass wir während des Long.MAX_VALUE ein Timeout von Long.MAX_VALUE Es wird also unbegrenzt auf eine neue Nachricht gewartet. Um den Consumer ordnungsgemäß zu schließen, muss vor dem Beenden der Anwendung die Methode shutdown() aufgerufen werden.

Ein Treiber könnte es so verwenden:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0,9)

Konfiguration und Initialisierung

Erstellen Sie zunächst ein Maven-Projekt und fügen Sie Ihrem Pom folgende Abhängigkeit hinzu:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

Der Produzent wird mit einem Properties initialisiert. Es gibt viele Eigenschaften, mit denen Sie das Verhalten des Produzenten fein abstimmen können. Unten ist die minimale Konfiguration erforderlich:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

Bei den bootstrap-servers handelt es sich um eine erste Liste von einem oder mehreren Brokern, damit der Hersteller den Rest des Clusters ermitteln kann. Die Eigenschaften des serializer Kafka an, wie der Nachrichtenschlüssel und der Wert codiert werden sollen. Hier werden wir String-Nachrichten senden. Obwohl dies nicht erforderlich ist, wird das Festlegen einer client.id since immer empfohlen: Auf diese Weise können Sie Anforderungen auf dem Broker problemlos mit der client.id korrelieren, von der sie erstellt wurde.

Andere interessante Eigenschaften sind:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

Sie können die Haltbarkeit von Nachrichten, die an Kafka geschrieben werden, über die Einstellung " acks . Der Standardwert "1" erfordert eine explizite Bestätigung des Partitionsführers, dass das Schreiben erfolgreich war. Die stärkste Garantie, die Kafka bietet, ist acks=all garantiert, dass der Partitionsleiter nicht nur den Schreibvorgang akzeptiert, sondern auch erfolgreich auf alle in-sync-Replikate repliziert wurde. Sie können auch den Wert "0" verwenden, um den Durchsatz zu maximieren. Sie können jedoch nicht garantieren, dass die Nachricht erfolgreich in das Broker-Protokoll geschrieben wurde, da der Broker in diesem Fall nicht einmal eine Antwort sendet.

retries (Standardeinstellung> 0) legt fest, ob der Produzent versucht, die Nachricht nach einem Fehler erneut zu senden. Beachten Sie, dass bei Neuversuchen> 0 eine Neuordnung der Nachrichten auftreten kann, da der Neuversuch nach einem nachfolgenden Schreibvorgang erfolgen kann.

Kafka-Hersteller versuchen, gesendete Nachrichten in Batch zu sammeln, um den Durchsatz zu verbessern. Mit dem Java-Client können Sie batch.size , um die maximale Größe jedes Nachrichtenstapels in Byte zu steuern. Um mehr Zeit für das Füllen der linger.ms zu haben, können Sie linger.ms , damit der Produzent das Senden verzögert. Schließlich kann die compression.type mit der Einstellung Komprimierungstyp aktiviert werden.

Verwenden Sie buffer.memory , um den für den Java-Client verfügbaren Gesamtspeicher für das Sammeln nicht gesendeter Nachrichten zu begrenzen. Wenn dieses Limit erreicht ist, blockiert der Produzent zusätzliche Sends für max.block.ms bevor eine Ausnahme max.block.ms . Um zu vermeiden, dass Datensätze unbegrenzt in der Warteschlange request.timeout.ms werden, können Sie mit request.timeout.ms ein Timeout request.timeout.ms .

Die vollständige Liste der Eigenschaften finden Sie hier . Ich schlage vor, diesen Artikel von Confluent für weitere Details zu lesen.

Nachrichten senden

Die send() -Methode ist asynchron. Beim Aufruf fügt er den Datensatz zu einem Puffer mit ausstehenden Datensatzübertragungen hinzu und kehrt sofort zurück. Dies ermöglicht es dem Hersteller, einzelne Datensätze für die Effizienz zusammenzufassen.

Das Ergebnis von send ist ein RecordMetadata , das die Partition angibt, an die der Datensatz gesendet wurde, und den Offset, dem er zugewiesen wurde. Da der Sendeaufruf asynchron ist, gibt er eine Future für die RecordMetadata zurück, die diesem Datensatz zugewiesen wird. Um die Metadaten abzurufen, können Sie entweder get() aufrufen, das blockiert, bis die Anforderung abgeschlossen ist, oder einen Rückruf verwenden.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

Der Code

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow