apache-kafka
Verbrauchergruppen und Offset-Management
Suche…
Parameter
| Parameter | Beschreibung |
|---|---|
| Gruppen-ID | Der Name der Verbrauchergruppe. |
| enable.auto.commit | Offsets automatisch festschreiben; Standardeinstellung: true |
| auto.commit.interval.ms | Die minimale Verzögerung in Millisekunden zwischen to-Commits (erfordert enable.auto.commit=true ); Standardeinstellung: 5000 . |
| auto.offset.reset | Was ist zu tun, wenn kein gültiger festgeschriebener Offset gefunden wurde? voreingestellt: neueste (+) |
| (+) Mögliche Werte | Beschreibung |
| früheste | Setzt den Offset automatisch auf den frühesten Offset zurück. |
| neueste | Setzen Sie den Versatz automatisch auf den neuesten Versatz zurück. |
| keiner | Ausnahme für den Verbraucher auslösen, wenn für die Gruppe des Verbrauchers kein vorheriger Offset gefunden wurde. |
| noch etwas | Ausnahme für den Verbraucher werfen. |
Was ist eine Verbrauchergruppe?
Ab Kafka 0.9 steht der neue KafkaConsumer- Client auf hohem Niveau zur Verfügung. Es nutzt ein neues integriertes Kafka-Protokoll , das die Kombination mehrerer Verbraucher in einer sogenannten Consumer-Gruppe ermöglicht . Eine Consumer Group kann als ein einzelner logischer Consumer bezeichnet werden, der eine Reihe von Themen abonniert. Die Teilbereiche über alle Themen werden den physischen Verbrauchern innerhalb der Gruppe zugeordnet, so dass jedes Patent genau einem Verbraucher zugewiesen wird (ein einzelner Verbraucher kann mehrere Teiltiteln erhalten). Die einzelnen Konsumenten, die zu derselben Gruppe gehören, können auf verschiedenen Hosts verteilt ausgeführt werden.
Verbrauchergruppen werden über ihre group.id identifiziert. Um eine bestimmte group.id zu einer Consumer Group zu machen, genügt es, die Gruppen group.id diesem Client über die Konfiguration des Clients zuzuweisen:
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
Daher bilden alle Verbraucher, die sich mit demselben Kafka-Cluster verbinden und dieselbe group.id , eine Verbrauchergruppe. Verbraucher können jederzeit eine Gruppe verlassen, und neue Verbraucher können jederzeit einer Gruppe beitreten. In beiden Fällen wird eine sogenannte Neuverteilung ausgelöst, und Partitionen werden bei der Consumer-Gruppe neu zugewiesen, um sicherzustellen, dass jede Partition von genau einem Consumer innerhalb der Gruppe verarbeitet wird.
KafkaConsumer , dass auch ein einzelner KafkaConsumer mit sich selbst als Einzelmitglied eine Consumer Group bildet.
Consumer Offset Management und Fehlertoleranz
KafkaConsumers fordern über einen Aufruf von poll() Nachrichten von einem Kafka-Broker an poll() und ihr Fortschritt wird über Offsets verfolgt . Jeder Nachricht in jeder Partition jedes Themas ist ein sogenannter Versatz zugewiesen, dessen logische Folgenummer in der Partition. Ein KafkaConsumer verfolgt seinen aktuellen Versatz für jede ihm zugewiesene Partition. Beachten Sie, dass die Kafka-Broker die aktuellen Offsets der Verbraucher nicht kennen. Daher muss der Verbraucher bei poll() seine aktuellen Offsets an den Broker senden, sodass der Broker die entsprechenden Nachrichten zurücksenden kann. Nachrichten mit einem größeren nachfolgenden Offset. Nehmen wir zum Beispiel an, dass wir ein einzelnes Partitionsthema und einen einzelnen Consumer mit aktuellem Offset 5 haben. Bei poll() sendet der Consumer, wenn ein Offset an den Broker gesendet wird, und der Broker sendet Nachrichten für die Offsets 6,7,8, ...
Da Verbraucher ihre Offsets selbst verfolgen, können diese Informationen verloren gehen, wenn ein Verbraucher ausfällt. Offsets müssen daher zuverlässig gespeichert werden, so dass ein Verbraucher beim Neustart seinen alten Offset aufheben und dort weiterspielen kann, wo er sich befindet. In Kafka ist dies durch Offset-Commits integriert . Der neue KafkaConsumer kann seinen aktuellen Offset an Kafka übergeben, und Kafka speichert diese Offsets in einem speziellen Thema namens __consumer_offsets . Das Speichern der Offsets innerhalb eines Kafka-Themas ist nicht nur fehlertolerant, sondern ermöglicht auch die Neuzuordnung von Partitionen während einer Neuverteilung. Da alle Benutzer einer Consumer-Gruppe auf alle festgeschriebenen Offsets aller Partitionen zugreifen können, liest ein __consumer_offsets , der eine neue Partition zugewiesen bekommt, bei der __consumer_offsets nur den festgeschriebenen Offset dieser Partition aus dem Thema __consumer_offsets und setzt dort fort, wo der alte Consumer __consumer_offsets .
Offsets festlegen
KafkaConsumers können Offsets automatisch im Hintergrund enable.auto.commit = true (Konfigurationsparameter enable.auto.commit = true ). enable.auto.commit = true ist die Standardeinstellung. Diese Auto-Commits werden innerhalb von poll() ( das normalerweise in einer Schleife aufgerufen wird ) ausgeführt. Wie häufig Offsets ausgeführt werden sollen, kann über auto.commit.interval.ms konfiguriert werden. Da Auto-Commits in poll() eingebettet sind und poll() vom Benutzercode aufgerufen wird, definiert dieser Parameter eine Untergrenze für das Inter-Commit-Intervall.
Als Alternative zum automatischen Festschreiben können Offsets auch manuell verwaltet werden. Dafür sollte das automatische enable.auto.commit = false deaktiviert werden ( enable.auto.commit = false ). Für das manuelle KafkaConsumers bietet KafkaConsumers zwei Methoden an, nämlich commitSync () und commitAsync () . Wie der Name schon sagt, ist commitSync() ein blockierender Aufruf, der zurückkehrt, nachdem Offsets erfolgreich festgeschrieben wurden, und commitAsync() kehrt sofort zurück. Wenn Sie wissen möchten, ob ein Commit erfolgreich war oder nicht, können Sie einen Methodenrückrufhandler ( OffsetCommitCallback ) OffsetCommitCallback . Beachten Sie, dass der Verbraucher bei beiden Commit-Aufrufen die Offsets des letzten poll() Aufrufs festlegt. Zum Beispiel. Nehmen wir an, ein einzelnes Partitionsthema mit einem einzelnen Consumer und der letzte Aufruf von poll() Nachrichten mit Offsets 4,5,6 zurück. Beim Commit wird Offset 6 festgeschrieben, da dies der letzte vom Kundenclient verfolgte Offset ist. Gleichzeitig ermöglichen commitSync() und commitAsync() mehr Kontrolle darüber, welchen Offset Sie festlegen möchten: Wenn Sie die entsprechenden Überladungen verwenden, mit denen Sie eine Map<TopicPartition, OffsetAndMetadata> angeben Map<TopicPartition, OffsetAndMetadata> der Map<TopicPartition, OffsetAndMetadata> nur die angegebenen Offsets (dh die Karte kann eine beliebige Teilmenge der zugewiesenen Partitionen enthalten, und der angegebene Versatz kann einen beliebigen Wert haben).
Semantik begangener Offsets
Ein festgeschriebener Offset zeigt an, dass alle Nachrichten bis zu diesem Offset bereits verarbeitet wurden. Da Offsets aufeinanderfolgende Zahlen sind, werden durch das Festlegen des Offsets X implizit alle Offsets kleiner als X . Daher ist es nicht notwendig, jeden Versatz einzeln festzulegen, und mehrere Versätze gleichzeitig zu begehen, dies geschieht jedoch nur der größte Versatz.
Beachten Sie, dass es konstruktionsbedingt auch möglich ist, einen kleineren Offset als den zuletzt festgeschriebenen Offset festzulegen. Dies ist möglich, wenn Nachrichten ein zweites Mal gelesen werden sollen.
Bearbeitungsgarantien
Die Verwendung des automatischen Festschreibens ermöglicht die Verarbeitung von Semantiken. Die zugrunde liegende Annahme ist, dass poll() nur aufgerufen wird, nachdem alle zuvor gelieferten Nachrichten erfolgreich verarbeitet wurden. Dadurch wird sichergestellt, dass keine Nachricht verloren geht, da nach der Verarbeitung ein Commit ausgeführt wird. Wenn ein Consumer vor einem Commit ausfällt, werden alle Nachrichten nach dem letzten Commit von Kafka empfangen und erneut verarbeitet. Dieser Wiederholungsversuch kann jedoch zu Duplikaten führen, da möglicherweise eine Nachricht des letzten poll() Aufrufs verarbeitet wurde, der Fehler jedoch unmittelbar vor dem Auto-Commit-Aufruf aufgetreten ist.
Wenn eine Bearbeitungssemantik erforderlich ist, muss das automatische commitSync() deaktiviert werden, und ein commitSync() direkt nach dem commitSync() poll() sollte ausgeführt werden. Danach werden Nachrichten verarbeitet. Dadurch wird sichergestellt, dass Nachrichten festgeschrieben werden, bevor sie verarbeitet werden, und somit niemals ein zweites Mal gelesen werden. Natürlich kann eine Nachricht im Fehlerfall verloren gehen.
Wie kann ich das Thema von Anfang an lesen?
Es gibt mehrere Strategien, um ein Thema von Anfang an zu lesen. Um dies zu erklären, müssen wir zuerst verstehen, was beim Start des Verbrauchers geschieht. Beim Start eines Verbrauchers geschieht Folgendes:
- treten Sie der konfigurierten Verbrauchergruppe bei, die eine Neuverteilung auslöst und dem Verbraucher Partitionen zuweist
- nach festgeschriebenen Offsets suchen (für alle Partitionen, die dem Verbraucher zugewiesen wurden)
- Für alle Partitionen mit gültigem Offset setzen Sie diesen Offset fort
-
auto.offset.resetfür alle Partitionen mit ungültigem Offset den Start-Offset gemäß dem Konfigurationsparameterauto.offset.reset
Starten Sie eine neue Consumer Group
Wenn Sie ein Thema von Anfang an group.id möchten, können Sie einfach eine neue Verbrauchergruppe starten (dh eine nicht verwendete group.id ) und auto.offset.reset = earliest . Da für eine neue Gruppe keine festgeschriebenen Offsets vorhanden sind, wird das automatische Offset-Reset ausgelöst, und das Thema wird von Anfang an verwendet. group.id Sie, dass beim Neustart des Verbrauchers, wenn Sie dieselbe group.id erneut verwenden, das Thema nicht von group.id wieder gelesen wird, sondern dort group.id wird, wo es group.id . Für diese Strategie müssen Sie group.id jedes Mal, wenn Sie ein Thema lesen möchten, eine neue group.id zuweisen.
Verwenden Sie dieselbe Gruppen-ID erneut
Um zu vermeiden, group.id jedes Mal, wenn Sie ein Thema von Anfang an lesen möchten, eine neue group.id , können Sie die automatische enable.auto.commit = false deaktivieren (über enable.auto.commit = false ), bevor Sie den Consumer zum ersten Mal starten (mithilfe einer nicht verwendeten group.id und Einstellung auto.offset.reset = earliest ). Darüber hinaus sollten Sie keine Offsets manuell festlegen. Da Offsets niemals mit dieser Strategie festgelegt werden, liest der Verbraucher das Thema beim Neustart erneut von Anfang an.
Diese Strategie hat jedoch zwei Nachteile:
- es ist nicht fehlertolerant
- Gruppengleichgewicht funktioniert nicht wie beabsichtigt
(1) Da Offsets niemals festgeschrieben werden, werden ein fehlerhafter und ein gestoppter Consumer beim Neustart auf dieselbe Weise behandelt. In beiden Fällen wird das Thema von Anfang an verwendet. (2) Da Offset niemals festgeschrieben wird, werden neu zugewiesene Partitionen bei der Neuverteilung von Anfang an Verbraucher.
Daher funktioniert diese Strategie nur für Verbrauchergruppen mit einem einzelnen Verbraucher und sollte nur für Entwicklungszwecke verwendet werden.
Verwenden Sie dieselbe Gruppen-ID und Commit erneut
Wenn Sie fehlertolerant sein möchten und / oder mehrere Verbraucher in Ihrer Verbrauchergruppe verwenden möchten, müssen Sie Offsets festlegen. Wenn Sie also ein Thema von Anfang an lesen möchten, müssen Sie festgelegte Offsets beim Start des Konsumenten manipulieren. Dafür bietet KafkaConsumer drei Methoden seek() , seekToBeginning() und seekToEnd() . Während seek() verwendet werden kann, um einen beliebigen Versatz festzulegen, können die zweite und dritte Methode verwendet werden, um den Anfang bzw. das Ende einer Partition zu suchen. Bei einem Ausfall und bei einem Neustart des Verbrauchers würde somit das Suchen entfallen, und der Verbraucher kann dort fortfahren, wo er aufgehört hat. Für Consumer-stop-and- seekToBeginning() -from-Beginn wird seekToBeginning() explizit aufgerufen, bevor Sie Ihre poll() Schleife eingeben. Beachten Sie, dass seekXXX() nur verwendet werden kann, nachdem ein Verbraucher einer Gruppe seekXXX() . seekXXX() vor der Verwendung von seekXXX() eine "Dummy-Abfrage" durchgeführt werden. Der Gesamtcode würde ungefähr so aussehen:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}