apache-kafka
Producteur / Consommateur en Java
Recherche…
Introduction
Cette rubrique montre comment produire et consommer des enregistrements en Java.
SimpleConsumer (Kafka> = 0.9.0)
La version 0.9 de Kafka a introduit une refonte complète du consommateur de kafka. Si vous êtes intéressé par l'ancien SimpleConsumer (0.8.X), consultez cette page . Si votre installation de Kafka est plus récente que la 0.8.X, les codes suivants devraient être intégrés.
Configuration et initialisation
Kafka 0.9 ne prend plus en charge Java 6 ou Scala 2.9. Si vous êtes toujours sous Java 6, envisagez de passer à une version prise en charge.
Tout d'abord, créez un projet Maven et ajoutez la dépendance suivante dans votre pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Note : n'oubliez pas de mettre à jour le champ de version pour les dernières versions (maintenant> 0.10).
Le consommateur est initialisé à l'aide d'un objet Properties . Il y a beaucoup de propriétés vous permettant d'affiner le comportement du consommateur. Voici la configuration minimale requise:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Le bootstrap-servers est une liste initiale de courtiers permettant au consommateur de découvrir le reste du cluster. Cela ne doit pas nécessairement être tous les serveurs du cluster: le client déterminera l'ensemble complet des courtiers actifs parmi les courtiers de cette liste.
Le deserializer indique au consommateur comment interpréter / désérialiser les clés et les valeurs du message. Ici, nous utilisons le StringDeserializer .
Enfin, le group.id correspond au groupe de consommateurs de ce client. Rappelez-vous: tous les consommateurs d'un groupe de consommateurs diviseront les messages entre eux (kafka agissant comme une file d'attente de messages), tandis que les consommateurs de différents groupes de consommateurs recevront les mêmes messages (kafka agissant comme un système de publication / abonnement).
D'autres propriétés utiles sont:
auto.offset.reset: contrôle ce qu'il faut faire si le décalage stocké dans Zookeeper est manquant ou hors de portée. Les valeurs possibles sont leslatestet lesearliest. Tout le reste jettera une exception;enable.auto.commit: sitrue(valeur par défaut), le décalage du consommateur est périodiquement (voirauto.commit.interval.ms) enregistré en arrière-plan. Le paramétrer surfalseet utiliserauto.offset.reset=earliest- permet de déterminer à partir de quel endroit le consommateur doit commencer au cas où aucune information deauto.offset.reset=earliestne serait trouvée. signifie auearliestdepuis le début de la partition de sujet assignée.latestmoyens à partir du plus grand nombre de compensations validées disponibles pour la partition. Cependant, le consommateur Kafka reprendra toujours le dernier décalage validé tant qu'un enregistrement de décalage valide est trouvé (par exemple, en ignorantauto.offset.reset. Le meilleur exemple est lorsqu'un nouveau groupe de consommateurs s'abonne à un sujet.auto.offset.resetpour déterminer s'il faut commencer par le début (au plus tôt) ou la fin (la plus récente) du sujet.session.timeout.ms: un délai d'attente de session garantit que le verrou sera libéré en cas de panne du consommateur ou si une partition réseau isole le consommateur du coordinateur. Effectivement:Lorsqu'il fait partie d'un groupe de consommateurs, chaque consommateur se voit attribuer un sous-ensemble de partitions à partir des sujets auxquels il est abonné. Ceci est essentiellement un verrou de groupe sur ces partitions. Tant que le verrou est maintenu, aucun autre membre du groupe ne pourra en lire. Lorsque votre consommateur est en bonne santé, c'est exactement ce que vous voulez. C'est la seule façon d'éviter la consommation en double. Mais si le consommateur meurt à la suite d'une défaillance de l'ordinateur ou de l'application, vous devez libérer ce verrou pour pouvoir affecter les partitions à un membre sain. la source
La liste complète des propriétés est disponible ici http://kafka.apache.org/090/documentation.html#newconsumerconfigs .
Création du consommateur et abonnement au sujet
Une fois que nous avons les propriétés, créer un consommateur est facile:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );
Après vous être abonné, le consommateur peut se coordonner avec le reste du groupe pour obtenir son affectation de partition. Tout cela est géré automatiquement lorsque vous commencez à consommer des données.
Sondage de base
Le consommateur doit pouvoir extraire des données en parallèle, potentiellement à partir de nombreuses partitions, sur de nombreux sujets susceptibles de s’étendre à de nombreux courtiers. Heureusement, tout cela est géré automatiquement lorsque vous commencez à consommer des données. Pour ce faire, il suffit d'appeler le poll en boucle et le consommateur gère le reste.
poll renvoie un ensemble (éventuellement vide) de messages provenant des partitions affectées.
while( true ){
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( !records.isEmpty() ){
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
Le code
Exemple de base
C'est le code le plus élémentaire que vous pouvez utiliser pour récupérer des messages à partir d'un sujet kafka.
public class ConsumerExample09{
public static void main( String[] args ){
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "auto.offset.reset", "earliest" );
props.put( "enable.auto.commit", "false" );
props.put( "group.id", "octopus" );
try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
consumer.subscribe( Collections.singletonList( "test-topic" ) );
while( true ){
// poll with a 100 ms timeout
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( records.isEmpty() ) continue;
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
}
}
Exemple runnable
Le consommateur est conçu pour être exécuté dans son propre thread. Il n'est pas sûr pour une utilisation multithread sans synchronisation externe et ce n'est probablement pas une bonne idée d'essayer.
Vous trouverez ci-dessous une simple tâche Runnable qui initialise le consommateur, s'abonne à une liste de rubriques et exécute la boucle d'interrogation indéfiniment jusqu'à son extinction externe.
public class ConsumerLoop implements Runnable{
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop( int id, String groupId, List<String> topics ){
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092");
props.put( "group.id", groupId );
props.put( "auto.offset.reset", "earliest" );
props.put( "key.deserializer", StringDeserializer.class.getName() );
props.put( "value.deserializer", StringDeserializer.class.getName() );
this.consumer = new KafkaConsumer<>( props );
}
@Override
public void run(){
try{
consumer.subscribe( topics );
while( true ){
ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}catch( WakeupException e ){
// ignore for shutdown
}finally{
consumer.close();
}
}
public void shutdown(){
consumer.wakeup();
}
}
Notez que nous utilisons un délai d'attente de Long.MAX_VALUE lors du sondage, il attendra donc indéfiniment un nouveau message. Pour fermer correctement le consommateur, il est important d'appeler sa méthode shutdown() avant de terminer l'application.
Un pilote pourrait l'utiliser comme ceci:
public static void main( String[] args ){
int numConsumers = 3;
String groupId = "octopus";
List<String> topics = Arrays.asList( "test-topic" );
ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
final List<ConsumerLoop> consumers = new ArrayList<>();
for( int i = 0; i < numConsumers; i++ ){
ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
consumers.add( consumer );
executor.submit( consumer );
}
Runtime.getRuntime().addShutdownHook( new Thread(){
@Override
public void run(){
for( ConsumerLoop consumer : consumers ){
consumer.shutdown();
}
executor.shutdown();
try{
executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
}catch( InterruptedException e ){
e.printStackTrace();
}
}
} );
}
SimpleProducer (kafka> = 0.9)
Configuration et initialisation
Tout d'abord, créez un projet Maven et ajoutez la dépendance suivante dans votre pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Le producteur est initialisé à l'aide d'un objet Properties . Il y a beaucoup de propriétés vous permettant d'affiner le comportement du producteur. Voici la configuration minimale requise:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
Le bootstrap-servers est une liste initiale d'un ou plusieurs courtiers pour que le producteur puisse découvrir le reste du cluster. Les propriétés du serializer indiquent à Kafka comment la clé et la valeur du message doivent être codées. Ici, nous enverrons des messages de chaîne. Bien que cela ne soit pas obligatoire, la définition d'un client.id est toujours recommandée: cela vous permet de corréler facilement les requêtes sur le courtier avec l'instance cliente qui l'a client.id .
Les autres propriétés intéressantes sont:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
Vous pouvez contrôler la durabilité des messages écrits sur Kafka via le paramètre acks . La valeur par défaut de «1» nécessite un accusé de réception explicite du leader de la partition indiquant que l’écriture a réussi. La garantie la plus forte fournie par Kafka est d' acks=all , ce qui garantit que le leader de la partition a non seulement accepté l'écriture, mais qu'il a été répliqué avec succès sur toutes les répliques in-sync. Vous pouvez également utiliser une valeur de «0» pour optimiser le débit, mais vous ne pouvez pas garantir que le message a bien été écrit dans le journal du courtier, car le courtier n'envoie même pas de réponse dans ce cas.
retries (default to> 0) détermine si le producteur essaie de renvoyer le message après un échec. Notez qu'avec les tentatives> 0, le réordonnancement des messages peut se produire car la nouvelle tentative peut se produire après une écriture suivante réussie.
Les producteurs de Kafka tentent de collecter les messages envoyés dans des lots pour améliorer le débit. Avec le client Java, vous pouvez utiliser batch.size pour contrôler la taille maximale en octets de chaque lot de messages. Pour donner plus de temps aux lots à remplir, vous pouvez utiliser linger.ms pour que le producteur linger.ms un envoi. Enfin, la compression peut être activée avec le paramètre compression.type .
Utilisez buffer.memory pour limiter la mémoire totale disponible au client Java pour la collecte des messages non envoyés. Lorsque cette limite est atteinte, le producteur bloque les envois supplémentaires aussi longtemps que max.block.ms avant de max.block.ms une exception. De plus, pour éviter de conserver des enregistrements indéfiniment en file d'attente, vous pouvez définir un délai d'attente à l'aide de request.timeout.ms .
La liste complète des propriétés est disponible ici . Je suggère de lire cet article de Confluent pour plus de détails.
Envoi de messages
La méthode send() est asynchrone. Lorsqu'il est appelé, il ajoute l'enregistrement à un tampon des enregistrements en attente et le retourne immédiatement. Cela permet au producteur de regrouper des enregistrements individuels pour des raisons d'efficacité.
Le résultat de l'envoi est un RecordMetadata spécifiant la partition à laquelle l'enregistrement a été envoyé et le décalage RecordMetadata il a été affecté. Étant donné que l'appel d'envoi est asynchrone, il renvoie un Future pour le RecordMetadata qui sera affecté à cet enregistrement. Pour consulter les métadonnées, vous pouvez soit appeler get() , qui bloquera jusqu'à la fin de la requête, soit utiliser un rappel.
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
Le code
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}