Recherche…


Paramètres

Paramètre La description
group.id Le nom du groupe de consommateurs.
enable.auto.commit Commettre automatiquement des compensations; default: true .
auto.commit.interval.ms Le délai minimum en millisecondes entre les enable.auto.commit=true (nécessite enable.auto.commit=true ); par défaut: 5000 .
auto.offset.reset Que faire en l'absence de validation validée validée? par défaut: le plus récent (+)
(+) Valeurs possibles La description
au plus tôt Réinitialise automatiquement le décalage au décalage le plus précoce.
dernier Rétablir automatiquement le décalage au dernier décalage.
aucun Jetez une exception au consommateur si aucun décalage précédent n'a été trouvé pour le groupe du consommateur.
rien d'autre Jetez une exception au consommateur.

Qu'est-ce qu'un groupe de consommateurs?

À partir de Kafka 0.9, le nouveau client de haut niveau KafkaConsumer est disponible. Il exploite un nouveau protocole Kafka intégré qui permet de combiner plusieurs consommateurs dans un groupe appelé Consumer Group . Un groupe de consommateurs peut être décrit comme un consommateur logique unique qui souscrit à un ensemble de rubriques. Les partions de tous les sujets sont attribuées aux consommateurs physiques du groupe, de sorte que chaque demande est attribuée à un seul consommateur (un seul consommateur peut recevoir plusieurs parties attribuées). Les consommateurs individuels appartenant au même groupe peuvent fonctionner sur des hôtes différents de manière distribuée.

Les groupes de consommateurs sont identifiés via leur group.id . Pour créer un membre d'instance client spécifique d'un groupe de consommateurs, il suffit d'affecter les groupes group.id à ce client, via la configuration du client:

Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);

Ainsi, tous les consommateurs qui se connectent au même cluster Kafka et utilisent le même group.id forment un groupe de consommateurs. Les consommateurs peuvent quitter un groupe à tout moment et les nouveaux consommateurs peuvent rejoindre un groupe à tout moment. Dans les deux cas, un soi-disant rééquilibrage est déclenché et les partitions sont réaffectées au groupe de consommateurs pour garantir que chaque partition est traitée par un seul consommateur du groupe.

Faites attention, même un seul KafkaConsumer forme un groupe de consommateurs en tant que membre unique.

Gestion des crédits à la consommation et tolérance aux fautes

KafkaConsumers demande des messages à un courtier Kafka via un appel à poll() et leur progression est suivie par des décalages . Chaque message de chaque partition de chaque sujet est associé à un décalage, c'est-à-dire son numéro de séquence logique dans la partition. Un KafkaConsumer suit son décalage actuel pour chaque partition qui lui est affectée. Faites attention, les courtiers Kafka ne sont pas au courant des compensations actuelles des consommateurs. Ainsi, sur poll() le consommateur doit envoyer ses décalages actuels au courtier, de sorte que le courtier puisse renvoyer les messages correspondants, c.-à-d. messages avec un décalage consécutif plus important. Par exemple, supposons que nous ayons un seul sujet de partition et un seul consommateur avec le décalage actuel 5. Sur poll() le consommateur envoie le décalage au courtier et le courtier renvoie les messages pour les décalages 6,7,8, ...

Étant donné que les consommateurs suivent eux-mêmes leurs décalages, ces informations peuvent être perdues en cas de défaillance d'un consommateur. Par conséquent, les décalages doivent être stockés de manière fiable, de sorte qu’au redémarrage, un consommateur puisse récupérer son ancien décalage et son nouvel article là où il l’a laissé. Dans Kafka, il existe un support intégré pour cela via des commits offset . Le nouveau KafkaConsumer peut valider son décalage actuel sur Kafka et Kafka stocke ces décalages dans une rubrique spéciale appelée __consumer_offsets . Stocker les décalages dans une rubrique Kafka n'est pas seulement tolérant aux pannes, mais permet également de réaffecter des partitions à d'autres consommateurs lors d'un rééquilibrage. Étant donné que tous les consommateurs d'un groupe de consommateurs peuvent accéder à tous les décalages validés de toutes les partitions, lors d'un rééquilibrage, un consommateur qui reçoit une nouvelle partition lit simplement le décalage __consumer_offsets de cette partition à partir de la rubrique __consumer_offsets .

Comment commettre des compensations

KafkaConsumers peut valider automatiquement les décalages en arrière-plan (paramètre de configuration enable.auto.commit = true ) quel est le paramètre par défaut. Ces validations automatiques sont effectuées dans poll() ( généralement appelé dans une boucle ). La fréquence à laquelle les décalages doivent être auto.commit.interval.ms peut être configurée via auto.commit.interval.ms . Comme les validations automatiques sont incorporées dans poll() et que poll() est appelé par le code utilisateur, ce paramètre définit une limite inférieure pour l'intervalle inter-validation.

Comme alternative à la validation automatique, les décalages peuvent également être gérés manuellement. Pour cela, la validation automatique doit être désactivée ( enable.auto.commit = false ). Pour la KafkaConsumers manuelle, KafkaConsumers propose deux méthodes, à savoir commitSync () et commitAsync () . Comme son nom l'indique, commitSync() est un appel bloquant qui retourne après que les décalages ont été commitAsync() , alors que commitAsync() retourne immédiatement. Si vous voulez savoir si une validation a réussi ou non, vous pouvez fournir un gestionnaire de OffsetCommitCallback ( OffsetCommitCallback ) à un paramètre de méthode. Faites attention, dans les deux appels de validation, le consommateur valide les décalages du dernier appel poll() . Par exemple. supposons un sujet de partition unique avec un seul consommateur et le dernier appel à poll() renvoie des messages avec des décalages 4,5,6. Lors de la validation, le décalage 6 sera validé car il s'agit du dernier décalage suivi par le client consommateur. En même temps, commitSync() et commitAsync() permettent plus de contrôle sur le décalage que vous souhaitez valider: si vous utilisez les surcharges correspondantes vous permettant de spécifier une Map<TopicPartition, OffsetAndMetadata> le consommateur ne Map<TopicPartition, OffsetAndMetadata> que les décalages spécifiés (c.-à-d. que la carte peut contenir n'importe quel sous-ensemble de partitions assignées et que le décalage spécifié peut avoir n'importe quelle valeur).

Sémantique des compensations engagées

Un décalage engagé indique que tous les messages jusqu'à ce décalage ont déjà été traités. Ainsi, comme les décalages sont des nombres consécutifs, la validation du décalage X valide implicitement tous les décalages inférieurs à X Par conséquent, il n'est pas nécessaire de valider chaque décalage individuellement et de commettre plusieurs décalages à la fois, mais en validant le plus grand décalage.

Faites attention, car de par sa conception, il est également possible de commettre un décalage plus petit que le dernier décalage engagé. Cela peut être fait si les messages doivent être lus une seconde fois.

Traitement des garanties

L'utilisation de la validation automatique fournit une sémantique de traitement au moins une fois. L'hypothèse sous-jacente est que poll() est uniquement appelé après que tous les messages précédemment livrés ont été traités avec succès. Cela garantit qu'aucun message ne soit perdu car une validation se produit après le traitement. Si un consommateur tombe en panne avant une validation, tous les messages après la dernière validation sont reçus de Kafka et traités à nouveau. Cependant, cette tentative peut entraîner des doublons, car certains messages du dernier appel poll() ont peut-être été traités, mais l'échec s'est produit juste avant l'appel de validation automatique.

Si une sémantique de traitement au plus une fois est requise, la validation automatique doit être désactivée et un commitSync() manuel directement après poll() doit être effectué. Par la suite, les messages sont traités. Cela garantit que les messages sont validés avant leur traitement et ne sont donc jamais lus une seconde fois. Bien sûr, certains messages peuvent être perdus en cas d’échec.

Comment puis-je lire le sujet depuis ses débuts

Il existe plusieurs stratégies pour lire un sujet depuis ses débuts. Pour les expliquer, nous devons d'abord comprendre ce qui se passe au démarrage du consommateur. Au démarrage d'un consommateur, les événements suivants se produisent:

  1. rejoindre le groupe de consommateurs configuré, ce qui déclenche un rééquilibrage et attribue des partitions au consommateur
  2. rechercher les compensations validées (pour toutes les partitions affectées au consommateur)
  3. pour toutes les partitions avec un offset valide, reprendre à partir de ce décalage
  4. pour toutes les partitions avec décalage non valide, définissez le décalage de départ en fonction du paramètre de configuration auto.offset.reset

Démarrer un nouveau groupe de consommateurs

Si vous souhaitez traiter un sujet depuis son début, vous pouvez simplement démarrer un nouveau groupe de consommateurs (par exemple, choisir un group.id inutilisé) et définir auto.offset.reset = earliest . Comme il n'y a pas de décalages validés pour un nouveau groupe, la réinitialisation automatique du décalage sera déclenchée et le sujet sera utilisé dès le début. Faites attention, au redémarrage du consommateur, si vous utilisez à nouveau le même group.id , il ne lira pas le sujet de nouveau, mais reprendra où il est parti. Ainsi, pour cette stratégie, vous devrez attribuer un nouveau group.id chaque fois que vous souhaitez lire un sujet depuis le début.

Réutiliser le même identifiant de groupe

Pour éviter de définir un nouveau group.id chaque fois que vous souhaitez lire un sujet depuis son début, vous pouvez désactiver la validation automatique (via enable.auto.commit = false ) avant de lancer le consommateur pour la toute première fois (en utilisant un group.id inutilisé group.id et paramètre auto.offset.reset = earliest ). De plus, vous ne devez pas commettre de décalage manuellement. Comme les décalages ne sont jamais validés avec cette stratégie, au redémarrage, le consommateur relira le sujet depuis le début.

Cependant, cette stratégie présente deux inconvénients:

  1. ce n'est pas tolérant aux fautes
  2. le rééquilibrage du groupe ne fonctionne pas comme prévu

(1) Comme les décalages ne sont jamais validés, un consommateur défaillant et arrêté est traité de la même manière au redémarrage. Dans les deux cas, le sujet sera consommé dès le début. (2) Le décalage n'étant jamais validé, lors du rééquilibrage, les partitions nouvellement assignées seront consommées dès le début.

Par conséquent, cette stratégie ne fonctionne que pour les groupes de consommateurs avec un seul consommateur et ne devrait être utilisée qu'à des fins de développement.

Réutiliser le même identifiant de groupe et la même validation

Si vous souhaitez être tolérant aux pannes et / ou utiliser plusieurs consommateurs dans votre groupe de consommateurs, la validation des compensations est obligatoire. Ainsi, si vous souhaitez lire un sujet depuis le début, vous devez manipuler les décalages validés au démarrage du consommateur. Pour cela, KafkaConsumer fournit trois méthodes seek() , seekToBeginning() et seekToEnd() . Alors que seek() peut être utilisé pour définir un décalage arbitraire, les deuxième et troisième méthodes peuvent être utilisées pour rechercher respectivement le début et la fin d’une partition. Ainsi, en cas d'échec et de redémarrage, la recherche de redémarrage serait omise et le consommateur pourrait reprendre sa sortie. Pour les utilisateurs de stop-and-restart-from- seekToBeginning() , seekToBeginning() serait appelé explicitement avant d'entrer dans votre boucle poll() . Notez que seekXXX() ne peut être utilisé qu'après qu'un consommateur a rejoint un groupe - il est donc nécessaire d'effectuer un "sondage factice" avant d'utiliser seekXXX() . Le code global serait quelque chose comme ceci:

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow