apache-kafka
Produttore / consumatore in Java
Ricerca…
introduzione
Questo argomento mostra come produrre e consumare record in Java.
SimpleConsumer (Kafka> = 0.9.0)
La versione 0.9 di Kafka ha introdotto una riprogettazione completa del consumatore kafka. Se sei interessato al vecchio SimpleConsumer (0.8.X), dai un'occhiata a questa pagina . Se l'installazione di Kafka è più recente di 0.8.X, i seguenti codici dovrebbero funzionare immediatamente.
Configurazione e inizializzazione
Kafka 0.9 non supporta più Java 6 o Scala 2.9. Se si è ancora su Java 6, considerare l'aggiornamento a una versione supportata.
Per prima cosa, crea un progetto maven e aggiungi la seguente dipendenza nel tuo pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Nota : non dimenticare di aggiornare il campo versione per le ultime versioni (ora> 0.10).
Il consumatore viene inizializzato utilizzando un oggetto Properties . Ci sono molte proprietà che ti permettono di mettere a punto il comportamento del consumatore. Di seguito è la configurazione minima necessaria:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
I bootstrap-servers sono un elenco iniziale di broker per consentire al consumatore di scoprire il resto del cluster. Non è necessario che siano tutti i server nel cluster: il client determinerà il set completo di broker attivi dai broker in questo elenco.
Il deserializer indica al consumatore come interpretare / deserializzare le chiavi e i valori dei messaggi. Qui, usiamo StringDeserializer .
Infine, group.id corrisponde al gruppo di consumatori di questo cliente. Ricorda: tutti i consumatori di un gruppo di consumatori divideranno messaggi tra loro (kafka si comporta come una coda di messaggi), mentre i consumatori di diversi gruppi di consumatori riceveranno gli stessi messaggi (kafka si comporta come un sistema di sottoscrizione di pubblicazione).
Altre proprietà utili sono:
auto.offset.reset: controlla cosa fare se l'offset memorizzato in Zookeeper è mancante o fuori range. I valori possibili sono glilateste iearliest. Qualsiasi altra cosa genererà un'eccezione;enable.auto.commit: setrue(valore predefinito), l'offset del consumatore viene periodicamente (vedereauto.commit.interval.ms) salvato in background. Impostandolo sufalsee utilizzandoauto.offset.reset=earliest- è per determinare da dove deve partire l'utente nel caso in cui non venga trovata alcuna informazione di offset impegnata. mezzi menoearliestdall'inizio della partizione argomento assegnata.latestmezzo dal più alto numero di offset committed disponibili per la partizione. Tuttavia, il consumer Kafka riprenderà sempre dall'ultimo offset impegnato finché viene trovato un record di offset valido (ovvero ignorandoauto.offset.reset.) L'esempio migliore è quando un gruppo di consumatori nuovo di zecca si iscrive a un argomento.auto.offset.resetper determinare se iniziare dall'inizio (prima) o alla fine (più recente) dell'argomento.session.timeout.ms: un timeout della sessione garantisce che il blocco venga rilasciato se il consumatore si blocca o se una partizione di rete isola il consumatore dal coordinatore. Infatti:Quando si fa parte di un gruppo di consumatori, a ciascun consumatore viene assegnato un sottoinsieme delle partizioni dagli argomenti a cui è stato abbonato. Questo è fondamentalmente un blocco di gruppo su quelle partizioni. Finché il blocco è trattenuto, nessun altro membro del gruppo sarà in grado di leggere da loro. Quando il tuo consumatore è sano, questo è esattamente quello che vuoi. È l'unico modo per evitare il consumo duplicato. Ma se il consumatore muore a causa di un errore della macchina o dell'applicazione, è necessario che il blocco venga rilasciato in modo che le partizioni possano essere assegnate a un membro sano. fonte
L'elenco completo delle proprietà è disponibile qui http://kafka.apache.org/090/documentation.html#newconsumerconfigs .
Creazione di consumatori e abbonamento per argomento
Una volta che abbiamo le proprietà, creare un consumatore è facile:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );
Dopo esserti iscritto, il consumatore può coordinarsi con il resto del gruppo per ottenere l'assegnazione della partizione. Tutto ciò viene gestito automaticamente quando inizi a consumare dati.
Sondaggio di base
Il consumatore ha bisogno di essere in grado di recuperare i dati in parallelo, potenzialmente da molte partizioni per molti argomenti probabilmente distribuiti su molti broker. Fortunatamente, tutto questo viene gestito automaticamente quando inizi a consumare dati. Per farlo, tutto ciò che devi fare è chiamare il poll in un ciclo e il consumatore gestirà il resto.
poll restituisce un (forse vuoto) insieme di messaggi dalle partizioni che sono state assegnate.
while( true ){
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( !records.isEmpty() ){
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
Il codice
Esempio di base
Questo è il codice più semplice che puoi usare per recuperare i messaggi da un argomento di kafka.
public class ConsumerExample09{
public static void main( String[] args ){
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "auto.offset.reset", "earliest" );
props.put( "enable.auto.commit", "false" );
props.put( "group.id", "octopus" );
try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
consumer.subscribe( Collections.singletonList( "test-topic" ) );
while( true ){
// poll with a 100 ms timeout
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( records.isEmpty() ) continue;
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
}
}
Esempio eseguibile
Il consumatore è progettato per essere eseguito nella propria thread. Non è sicuro per l'uso con multithreading senza sincronizzazione esterna e probabilmente non è una buona idea da provare.
Di seguito è riportata una semplice attività Runnable che inizializza l'utente, sottoscrive un elenco di argomenti ed esegue il ciclo di polling indefinitamente fino allo spegnimento esterno.
public class ConsumerLoop implements Runnable{
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop( int id, String groupId, List<String> topics ){
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092");
props.put( "group.id", groupId );
props.put( "auto.offset.reset", "earliest" );
props.put( "key.deserializer", StringDeserializer.class.getName() );
props.put( "value.deserializer", StringDeserializer.class.getName() );
this.consumer = new KafkaConsumer<>( props );
}
@Override
public void run(){
try{
consumer.subscribe( topics );
while( true ){
ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}catch( WakeupException e ){
// ignore for shutdown
}finally{
consumer.close();
}
}
public void shutdown(){
consumer.wakeup();
}
}
Tieni presente che durante il sondaggio utilizziamo un timeout di Long.MAX_VALUE , pertanto attenderà indefinitamente un nuovo messaggio. Per chiudere correttamente il consumatore, è importante chiamare il suo metodo shutdown() prima di terminare l'applicazione.
Un driver potrebbe usarlo in questo modo:
public static void main( String[] args ){
int numConsumers = 3;
String groupId = "octopus";
List<String> topics = Arrays.asList( "test-topic" );
ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
final List<ConsumerLoop> consumers = new ArrayList<>();
for( int i = 0; i < numConsumers; i++ ){
ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
consumers.add( consumer );
executor.submit( consumer );
}
Runtime.getRuntime().addShutdownHook( new Thread(){
@Override
public void run(){
for( ConsumerLoop consumer : consumers ){
consumer.shutdown();
}
executor.shutdown();
try{
executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
}catch( InterruptedException e ){
e.printStackTrace();
}
}
} );
}
SimpleProducer (kafka> = 0,9)
Configurazione e inizializzazione
Per prima cosa, crea un progetto maven e aggiungi la seguente dipendenza nel tuo pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Il produttore viene inizializzato utilizzando un oggetto Properties . Ci sono molte proprietà che ti permettono di mettere a punto il comportamento del produttore. Di seguito è la configurazione minima necessaria:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
I bootstrap-servers sono un elenco iniziale di uno o più broker per consentire al produttore di scoprire il resto del cluster. Le proprietà serializer dicono a Kafka come la chiave e il valore del messaggio dovrebbero essere codificati. Qui, invieremo messaggi di stringa. Sebbene non sia richiesto, l'impostazione di un client.id è sempre consigliata: ciò consente di correlare facilmente le richieste sul broker con l'istanza client che lo ha creato.
Altre proprietà interessanti sono:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
Puoi controllare la durata dei messaggi scritti su Kafka attraverso l'impostazione degli acks . Il valore predefinito di "1" richiede un riconoscimento esplicito dal leader della partizione che la scrittura ha avuto esito positivo. La più forte garanzia offerta da Kafka è acks=all , che garantisce che non solo il leader della partizione accetta la scrittura, ma è stata replicata con successo su tutte le repliche in-sync. Puoi anche utilizzare il valore "0" per massimizzare il throughput, ma non avrai alcuna garanzia che il messaggio sia stato scritto correttamente nel log del broker poiché il broker non invia nemmeno una risposta in questo caso.
retries (predefinito su> 0) determina se il produttore tenta di inviare nuovamente il messaggio dopo un errore. Si noti che con i tentativi> 0, può verificarsi un riordino dei messaggi poiché il tentativo può verificarsi dopo una successiva scrittura riuscita.
I produttori di Kafka tentano di raccogliere i messaggi inviati in lotti per migliorare il rendimento. Con il client Java, è possibile utilizzare batch.size per controllare la dimensione massima in byte di ciascun batch di messaggi. Per dare più tempo per i lotti da riempire, puoi usare linger.ms per far ritardare la spedizione al produttore. Infine, la compressione può essere abilitata con l'impostazione compression.type .
Utilizzare buffer.memory per limitare la memoria totale disponibile al client Java per la raccolta di messaggi non inviati. Quando viene raggiunto questo limite, il produttore bloccherà le mandate aggiuntive fino a max.block.ms prima di generare un'eccezione. Inoltre, per evitare di mantenere i record accodati indefinitamente, è possibile impostare un timeout utilizzando request.timeout.ms .
L'elenco completo delle proprietà è disponibile qui . Suggerisco di leggere questo articolo di Confluent per maggiori dettagli.
Invio di messaggi
Il metodo send() è asincrono. Quando viene chiamato, aggiunge il record a un buffer di record in sospeso inviati e restituisce immediatamente. Ciò consente al produttore di raggruppare i record individuali per l'efficienza.
Il risultato di send è un RecordMetadata specifica la partizione su cui è stato inviato il record e l'offset che è stato assegnato. Poiché la chiamata di invio è asincrona, restituisce un Future per i RecordMetadata che verranno assegnati a questo record. Per consultare i metadati, è possibile chiamare get() , che bloccherà fino a quando la richiesta non verrà completata o non verrà utilizzata una richiamata.
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
Il codice
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}