Sök…


Introduktion

Detta ämne visar hur man producerar och konsumerar poster i Java.

SimpleConsumer (Kafka> = 0,9,0)

Kafka-utgåvan 0,9 introducerade en fullständig omdesign av kafka-konsumenten. Om du är intresserad av den gamla SimpleConsumer (0.8.X), titta på den här sidan . Om din Kafka-installation är nyare än 0,8.X bör följande koder fungera.

Konfiguration och initialisering

Kafka 0.9 stöder inte längre Java 6 eller Scala 2.9. Om du fortfarande är på Java 6 kan du överväga att uppgradera till en version som stöds.

Skapa först ett maven-projekt och lägg till följande beroende i din pom:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

Obs : glöm inte att uppdatera versionfältet för de senaste utgåvorna (nu> 0,10).

Konsumenten initialiseras med ett Properties objekt. Det finns många egenskaper som gör att du kan finjustera konsumentbeteendet. Nedan visas den minimala konfigurationen som behövs:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers är en initial lista över mäklare för konsumenten för att kunna upptäcka resten av klustret. Detta behöver inte vara alla servrar i klustret: klienten bestämmer hela uppsättningen av levande mäklare från mäklarna i denna lista.

deserializer berättar för konsumenten hur man tolkar / deserialiserar meddelandetangenterna och värdena. Här använder vi den inbyggda StringDeserializer .

Slutligen motsvarar group.id konsumentgrupp. Kom ihåg: alla konsumenter i en konsumentgrupp delar upp meddelanden mellan dem (kafka fungerar som en meddelandekö), medan konsumenter från olika konsumentgrupper får samma meddelanden (kafka fungerar som ett public-prenumerationssystem).

Andra användbara egenskaper är:

  • auto.offset.reset : styr vad som ska göras om den offset som lagras i Zookeeper antingen saknas eller är utanför räckvidden. Möjliga värden är latest och earliest . Allt annat kommer att kasta ett undantag;

  • enable.auto.commit : om det är true (standard) sparas konsumentförskjutningen regelbundet (se auto.commit.interval.ms ) i bakgrunden. Att ställa in det till false och använda auto.offset.reset=earliest - är att avgöra var bör konsumenten börja ifall ingen begåvad offsetinformation hittas. earliest betyder från början av den tilldelade ämnespartitionen. latest medel från det högsta antalet tillgängliga engagerade offset för partitionen. Kafka-konsumenten kommer emellertid alltid att återuppta från den senaste begåda förskjutningen så länge en giltig offset-post hittas (dvs. ignorera auto.offset.reset . Det bästa exemplet är när en helt ny konsumentgrupp prenumererar på ett ämne. Detta är när den använder auto.offset.reset att avgöra om man börjar från början (tidigast) eller till slutet (senast) av ämnet.

  • session.timeout.ms : en session timeout säkerställer att låset släpps om konsumenten kraschar eller om en nätverkspartition isolerar konsumenten från koordinatorn. Verkligen:

    När en del av en konsumentgrupp tilldelas varje konsument en delmängd av partitionerna från ämnen den har prenumererat på. Detta är i princip ett grupplås på dessa partitioner. Så länge låset hålls kan inga andra medlemmar i gruppen läsa från dem. När din konsument är frisk är det precis vad du vill ha. Det är det enda sättet du kan undvika duplikatförbrukning. Men om konsumenten dör på grund av en maskin- eller applikationsfel, behöver du det låset frigöras så att partitionerna kan tilldelas en frisk medlem. källa

Den fullständiga listan över egenskaper finns här http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Konsumentskapande och prenumerationsabonnemang

När vi väl har egenskaperna är det enkelt att skapa en konsument:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

När du har prenumererat kan konsumenten samordna sig med resten av gruppen för att få sin partitionsuppgift. Allt hanteras automatiskt när du börjar konsumera data.

Grundläggande enkät

Konsumenten måste kunna hämta data parallellt, eventuellt från många partitioner för många ämnen som troligen är spridda över många mäklare. Lyckligtvis hanteras allt detta automatiskt när du börjar konsumera data. För att göra det, allt du behöver göra är att ringa poll i en slinga och konsumenten hanterar resten.

poll ger en (möjligen tom) uppsättning meddelanden från de partitioner som tilldelades.

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

Koden

Grundläggande exempel

Detta är den mest grundläggande koden du kan använda för att hämta meddelanden från ett kafka-ämne.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

Körbart exempel

Konsumenten är utformad för att köras i sin egen tråd. Det är inte säkert för flertrådad användning utan extern synkronisering och det är förmodligen inte en bra idé att prova.

Nedan följer en enkel Runnable-uppgift som initialiserar konsumenten, prenumererar på en lista med ämnen och kör pollingslingan på obestämd tid tills den stängs av externt.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

Observera att vi använder en timeout på Long.MAX_VALUE under undersökningen, så den kommer att vänta på obestämd tid för ett nytt meddelande. För att stänga konsumenten ordentligt är det viktigt att anropa sin shutdown() innan du avslutar applikationen.

En förare kan använda den så här:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0,9)

Konfiguration och initialisering

Skapa först ett maven-projekt och lägg till följande beroende i din pom:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

Producenten initialiseras med ett Properties objekt. Det finns många egenskaper som gör att du kan finjustera producentens beteende. Nedan visas den minimala konfigurationen som behövs:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers är en initial lista med en eller flera mäklare för producenten för att kunna upptäcka resten av klustret. serializer berättar för Kafka hur meddelandenyckeln och värdet ska kodas. Här skickar vi strängmeddelanden. Även om det inte krävs, ställer du in en client.id eftersom det alltid rekommenderas: detta gör att du enkelt kan korrelera förfrågningar på mäklaren med klientinstansen som gjorde det.

Andra intressanta egenskaper är:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

Du kan styra hållbarheten för meddelanden som skrivits till Kafka genom acks inställningen. Standardvärdet för "1" kräver ett uttryckligt erkännande från partitionsledaren att skrivningen lyckades. Den starkaste garantin som Kafka tillhandahåller är med acks=all , vilket garanterar att inte bara partitionsledaren accepterade skrivet, utan det replikerades framgångsrikt till alla synkroniserade repliker. Du kan också använda ett värde på "0" för att maximera genomströmningen, men du har ingen garanti för att meddelandet har skrivits till mäklarens logg eftersom mäklaren inte ens skickar ett svar i det här fallet.

retries (standard till> 0) avgör om tillverkaren försöker skicka meddelandet igen efter ett fel. Observera att med omprövningar> 0 kan omordnandet av meddelanden inträffa eftersom återförsöket kan inträffa efter att följande skrivning har lyckats.

Kafka-tillverkare försöker samla in skickade meddelanden i partier för att förbättra genomströmningen. Med Java-klienten kan du använda batch.size att styra maximal storlek i byte för varje meddelandeparti. För att ge mer tid för att satserna ska fyllas kan du använda linger.ms att få producenten att försena sändningen. Slutligen kan komprimering aktiveras med inställningen compression.type .

Använd buffer.memory att begränsa det totala minnet som är tillgängligt för Java-klienten för att samla in ej skickade meddelanden. När denna gräns träffas kommer producenten att blockera ytterligare sändningar så länge som max.block.ms innan han gör ett undantag. För att undvika att hålla poster i kö på obestämd tid kan du ställa in en timeout med request.timeout.ms .

Den kompletta listan över fastigheter finns här . Jag föreslår att du läser den här artikeln från Confluent för mer information.

Skickar meddelanden

Metoden send() är asynkron. När den anropas läggs posten till en buffert med väntande postskick och returnerar omedelbart. Detta gör det möjligt för tillverkaren att samla enskilda poster för effektivitet.

Resultatet av skicka är en RecordMetadata anger den partition posten skickades till och den offset den tilldelades. Eftersom det sändande samtalet är asynkron returnerar det en Future för RecordMetadata som kommer att tilldelas denna post. För att konsultera metadata kan du antingen ringa get() , vilket kommer att blockeras tills begäran är klar eller använda ett återuppringning.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

Koden

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow