Zoeken…


Invoering

Dit onderwerp laat zien hoe u records in Java kunt produceren en consumeren.

SimpleConsumer (Kafka> = 0.9.0)

De 0.9 release van Kafka introduceerde een compleet herontwerp van de Kafka-consument. Als u geïnteresseerd bent in de oude SimpleConsumer (0.8.X), kijk dan op deze pagina . Als uw Kafka-installatie nieuwer is dan 0.8.X, zouden de volgende codes meteen moeten werken.

Configuratie en initialisatie

Kafka 0.9 ondersteunt niet langer Java 6 of Scala 2.9. Als u nog steeds op Java 6 bent, kunt u overwegen een upgrade uit te voeren naar een ondersteunde versie.

Maak eerst een maven-project en voeg de volgende afhankelijkheid toe aan je pom:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

Opmerking : vergeet niet het versieveld bij te werken voor de nieuwste releases (nu> 0,10).

De consument wordt geïnitialiseerd met een object Properties . Er zijn veel eigenschappen waarmee u het consumentengedrag kunt verfijnen. Hieronder is de minimaal benodigde configuratie:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

De bootstrap-servers is een eerste lijst met makelaars zodat de consument de rest van het cluster kan ontdekken. Dit hoeven niet alle servers in het cluster te zijn: de client bepaalt de volledige set actieve brokers van de brokers in deze lijst.

De deserializer vertelt de consument hoe de berichtsleutels en waarden moeten worden geïnterpreteerd / gedeserialiseerd. Hier gebruiken we de ingebouwde StringDeserializer .

Ten slotte komt de group.id overeen met de consumentengroep van deze client. Onthoud: alle consumenten van een consumentengroep zullen berichten onderling splitsen (kafka die fungeert als een berichtenwachtrij), terwijl consumenten van verschillende consumentengroepen dezelfde berichten krijgen (kafka die fungeert als een publiceer-abonneer-systeem).

Andere nuttige eigenschappen zijn:

  • auto.offset.reset : bepaalt wat te doen als de offset opgeslagen in Zookeeper ontbreekt of buiten bereik is. Mogelijke waarden zijn de latest en earliest . Al het andere zal een uitzondering veroorzaken;

  • enable.auto.commit : indien true (standaard), wordt de consument-offset periodiek (zie auto.commit.interval.ms ) op de achtergrond opgeslagen. Als deze op false en auto.offset.reset=earliest wordt gebruikt, wordt bepaald waar de consument moet beginnen als er geen gecommitteerde auto.offset.reset=earliest wordt gevonden. earliest betekent vanaf het begin van de toegewezen onderwerppartitie. latest betekent van het hoogste aantal beschikbare toegewezen offset voor de partitie. Kafka-consument zal echter altijd hervatten vanaf de laatste gecommitteerde compensatie zolang een geldig compensatierecord wordt gevonden (dwz auto.offset.reset negeren. Het beste voorbeeld is wanneer een geheel nieuwe groep consumenten zich abonneert op een onderwerp. Dit is wanneer het gebruikt auto.offset.reset om te bepalen of vanaf het begin (vroegste) of het einde (laatste) van het onderwerp moet worden gestart.

  • session.timeout.ms : een sessietime-out zorgt ervoor dat het slot wordt vrijgegeven als de consument crasht of als een netwerkpartitie de consument van de coördinator isoleert. Inderdaad:

    Wanneer deel uitmakend van een consumentengroep, krijgt elke consument een subset van de partities van onderwerpen waarop hij is geabonneerd. Dit is eigenlijk een groepsslot op die partities. Zolang het slot wordt vastgehouden, kunnen geen andere leden in de groep van hen lezen. Wanneer uw consument gezond is, is dit precies wat u wilt. Het is de enige manier om dubbel gebruik te voorkomen. Maar als de consument sterft als gevolg van een machine- of applicatiefout, moet u dat slot vrijgeven zodat de partities aan een gezond lid kunnen worden toegewezen. bron

De volledige lijst met eigenschappen is hier beschikbaar http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Creatie van consumenten en abonnement op onderwerp

Zodra we de eigenschappen hebben, is het eenvoudig om een consument te maken:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

Nadat u zich hebt geabonneerd, kan de consument coördineren met de rest van de groep om de partitietoewijzing te krijgen. Dit wordt allemaal automatisch afgehandeld wanneer u gegevens begint te gebruiken.

Basic poll

De consument moet in staat zijn om gegevens parallel op te halen, mogelijk uit vele partities voor veel onderwerpen die waarschijnlijk verspreid zijn over veel makelaars. Gelukkig wordt dit allemaal automatisch afgehandeld wanneer u gegevens gaat gebruiken. Om dat te doen, hoef je alleen poll in een lus te bellen en de consument zorgt voor de rest.

poll retourneert een (mogelijk lege) set berichten van de toegewezen partities.

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

De code

Basis voorbeeld

Dit is de meest elementaire code die u kunt gebruiken om berichten van een kafka-onderwerp op te halen.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

Uitvoerbaar voorbeeld

De consument is ontworpen om in zijn eigen draad te worden uitgevoerd. Het is niet veilig voor multithreaded gebruik zonder externe synchronisatie en het is waarschijnlijk geen goed idee om het te proberen.

Hieronder staat een eenvoudige uitvoerbare taak die de consument initialiseert, zich abonneert op een lijst met onderwerpen en de poll-lus voor onbepaalde tijd uitvoert totdat deze extern wordt afgesloten.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

Houd er rekening mee dat we tijdens de peiling een time-out van Long.MAX_VALUE , dus deze wacht voor onbepaalde tijd op een nieuw bericht. Om de consument goed te sluiten, is het belangrijk om de methode shutdown() aan te roepen voordat de toepassing wordt beëindigd.

Een bestuurder kan het zo gebruiken:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0.9)

Configuratie en initialisatie

Maak eerst een maven-project en voeg de volgende afhankelijkheid toe aan je pom:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

De producent wordt geïnitialiseerd met een object Properties . Er zijn veel eigenschappen waarmee u het gedrag van de producent kunt afstemmen. Hieronder is de minimaal benodigde configuratie:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

De bootstrap-servers is een eerste lijst van een of meer makelaars voor de producent om de rest van het cluster te kunnen ontdekken. De eigenschappen van de serializer vertellen Kafka hoe de berichtsleutel en -waarde moeten worden gecodeerd. Hier sturen we string-berichten. Hoewel dit niet verplicht is, wordt het altijd aanbevolen om een client.id : hiermee kunt u eenvoudig aanvragen van de broker correleren met de clientinstantie die het heeft gemaakt.

Andere interessante eigenschappen zijn:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

U kunt de duurzaamheid van berichten die naar Kafka zijn geschreven, acks via de instelling voor acks . De standaardwaarde "1" vereist een expliciete bevestiging van de partitieleider dat het schrijven is geslaagd. De sterkste garantie die Kafka biedt, is met acks=all , wat garandeert dat niet alleen de partitieleider het schrijven heeft geaccepteerd, maar dat het met succes is gerepliceerd naar alle gesynchroniseerde replica's. U kunt ook een waarde van "0" gebruiken om de doorvoer te maximaliseren, maar u kunt niet garanderen dat het bericht met succes in het logboek van de broker is geschreven, omdat de broker in dit geval niet eens een antwoord verzendt.

retries (standaardwaarde> 0) bepaalt of de producent probeert het bericht opnieuw te verzenden na een fout. Merk op dat met nieuwe pogingen> 0 het opnieuw ordenen van berichten kan optreden, omdat het opnieuw proberen kan nadat een volgende schrijfactie is geslaagd.

Kafka-producenten proberen verzonden berichten in batches te verzamelen om de doorvoer te verbeteren. Met de Java-client kunt u batch.size gebruiken om de maximale grootte in bytes van elke berichtenbatch te beheren. Om meer tijd te geven voor het vullen van batches, kunt u linger.ms om de producent de verzending te laten vertragen. Ten slotte kan compressie worden ingeschakeld met de instelling compression.type .

Gebruik buffer.memory om het totale geheugen te beperken dat beschikbaar is voor de Java-client voor het verzamelen van niet-verzonden berichten. Wanneer deze limiet wordt bereikt, blokkeert de producent extra max.block.ms zolang max.block.ms voordat een uitzondering wordt max.block.ms . Om te voorkomen dat records voor onbepaalde tijd in de wachtrij blijven staan, kunt u bovendien een time-out instellen met request.timeout.ms .

De volledige lijst met eigenschappen is hier beschikbaar. Ik stel voor dit artikel van Confluent te lezen voor meer informatie.

Berichten versturen

De methode send() is asynchroon. Wanneer het wordt aangeroepen, wordt het record toegevoegd aan een buffer met wachtende records die worden verzonden en onmiddellijk worden geretourneerd. Hierdoor kan de producent afzonderlijke records samenvoegen voor efficiëntie.

Het resultaat van verzenden is een RecordMetadata die de partitie RecordMetadata het record is verzonden en de offset waaraan het is toegewezen. Omdat het verzendgesprek asynchroon is, wordt een Future geretourneerd voor de RecordMetadata die aan dit record worden toegewezen. Als u de metagegevens wilt raadplegen, kunt u get() bellen, die wordt geblokkeerd totdat het verzoek is voltooid, of een callback gebruiken.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

De code

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow