apache-kafka
Consumentengroepen en offsetbeheer
Zoeken…
parameters
| Parameter | Beschrijving |
|---|---|
| group.id | De naam van de consumentengroep. |
| enable.auto.commit | Offsets automatisch vastleggen; standaard: waar . |
| auto.commit.interval.ms | De minimale vertraging in milliseconden tussen naar commits (vereist enable.auto.commit=true ); standaard: 5000 . |
| auto.offset.reset | Wat te doen als er geen geldige gecommitteerde compensatie is gevonden; standaard: laatste . (+) |
| (+) Mogelijke waarden | Beschrijving |
| vroegste | Reset de offset automatisch naar de vroegste offset. |
| laatste | Reset de offset automatisch naar de nieuwste offset. |
| geen | Gooi uitzondering naar de consument als er geen eerdere compensatie wordt gevonden voor de groep van de consument. |
| nog iets anders | Gooi uitzondering naar de consument. |
Wat is een consumentengroep
Vanaf Kafka 0.9 is de nieuwe KafkaConsumer- client op hoog niveau beschikbaar. Het maakt gebruik van een nieuw ingebouwd Kafka-protocol dat het mogelijk maakt om meerdere consumenten te combineren in een zogenaamde Consumer Group . Een consumentengroep kan worden beschreven als een enkele logische consument die zich abonneert op een reeks onderwerpen. De delen over alle onderwerpen worden toegewezen aan de fysieke consumenten binnen de groep, zodat elk patitie wordt toegewezen aan één consument (een enkele consument kan meerdere partijen toegewezen krijgen). De individuele consumenten die tot dezelfde groep behoren, kunnen op verschillende hosts gedistribueerd werken.
Consumentengroepen worden geïdentificeerd via hun group.id . Om een specifiek client-exemplaar lid te maken van een consumentengroep, volstaat het om de group.id toe te wijzen aan deze client, via de configuratie van de client:
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
Dus alle consumenten die aansluiten op dezelfde Kafka cluster en maken gebruik van dezelfde group.id vormen een Consumer Group. Consumenten kunnen op elk moment een groep verlaten en nieuwe consumenten kunnen op elk moment lid worden van een groep. In beide gevallen wordt een zogenaamde rebalance geactiveerd en worden partities opnieuw toegewezen aan de Consumer Group om ervoor te zorgen dat elke partitie wordt verwerkt door één consument binnen de groep.
Let op, zelfs een enkele KafkaConsumer vormt een consumentengroep met zichzelf als één lid.
Consumentencompensatiebeheer en fouttolerantie
KafkaConsumers vragen berichten van een Kafka-makelaar via een aanroep naar poll() en hun voortgang wordt gevolgd via offsets . Aan elk bericht binnen elke partitie van elk onderwerp is een zogenaamde offset toegewezen - het logische volgnummer binnen de partitie. Een KafkaConsumer volgt zijn huidige offset voor elke partitie die eraan is toegewezen. Let op, de Kafka-makelaars zijn niet op de hoogte van de huidige compensaties van de consumenten. Dus op poll() de consument zijn huidige offsets naar de broker sturen, zodat de broker de bijbehorende berichten kan retourneren, dwz. berichten met een grotere opeenvolgende offset. Laten we bijvoorbeeld aannemen dat we een enkel partitiethema hebben en een enkele consument met huidige offset 5. Op poll() stuurt de consument indien offset naar de makelaar en de broker retourneert berichten voor offsets 6,7,8, ...
Omdat consumenten zelf hun offsets volgen, kan deze informatie verloren gaan als een consument faalt. Offsets moeten dus op een betrouwbare manier worden opgeslagen, zodat een consument bij het opnieuw opstarten zijn oude offset kan ophalen en verder kan gaan waar hij was gebleven. In Kafka is hiervoor ingebouwde ondersteuning via offset commits . De nieuwe KafkaConsumer kan zijn huidige offset vastleggen aan Kafka en Kafka slaat die offsets op in een speciaal onderwerp genaamd __consumer_offsets . Het opslaan van de offsets binnen een Kafka-onderwerp is niet alleen fouttolerant, maar maakt het ook mogelijk om partities opnieuw toe te wijzen aan andere consumenten tijdens een herbalancering. Omdat alle consumenten van een consumentengroep toegang hebben tot alle gecommitteerde offsets van alle partities, leest een consument die een nieuwe partitie toegewezen krijgt, net de gecommitteerde offset van deze partitie uit het onderwerp __consumer_offsets en gaat verder waar de oude consument was __consumer_offsets .
Offsets plegen
KafkaConsumers kunnen offsets automatisch op de achtergrond vastleggen (configuratieparameter enable.auto.commit = true ) wat is de standaardinstelling. Die auto commits worden gedaan binnen poll() ( dit wordt meestal in een lus genoemd ). Hoe vaak offsets moeten worden gepleegd, kan worden geconfigureerd via auto.commit.interval.ms . Omdat auto commits zijn ingebed in poll() en poll() wordt aangeroepen door de gebruikerscode, definieert deze parameter een ondergrens voor het inter-commit-interval.
Als alternatief voor automatisch vastleggen kunnen offsets ook handmatig worden beheerd. Hiervoor moet auto commit worden uitgeschakeld ( enable.auto.commit = false ). Voor handmatig committeren biedt KafkaConsumers twee methoden, namelijk commitSync () en commitAsync () . Zoals de naam aangeeft, is commitSync() een blokkerende aanroep die terugkeert nadat offsets met succes zijn gecommit, terwijl commitAsync() onmiddellijk terugkeert. Als je wilt weten of een commit succesvol was of niet, kun je een OffsetCommitCallback handler ( OffsetCommitCallback ) een OffsetCommitCallback . Let op, dat in beide commit-oproepen de consument de offsets van de laatste poll() -oproep pleegt. Bijvoorbeeld. laten we uitgaan van een enkel partitieonderwerp met een enkele consument en de laatste oproep om poll() berichten met offsets 4,5,6 te retourneren. Bij vastleggen wordt offset 6 vastgelegd omdat dit de nieuwste offset is die door de klant wordt gevolgd. Tegelijkertijd geven zowel commitSync() als commitAsync() meer controle over welke offset u wilt vastleggen: als u de overeenkomstige overbelastingen gebruikt waarmee u een Map<TopicPartition, OffsetAndMetadata> kunt opgeven Map<TopicPartition, OffsetAndMetadata> de consument alleen de opgegeven offsets vastleggen (dwz de kaart kan elke subset van toegewezen partities bevatten en de opgegeven offset kan elke waarde hebben).
Semantiek van toegewijde offsets
Een gecommitteerde offset geeft aan dat alle berichten tot deze offset al zijn verwerkt. Omdat offsets opeenvolgende getallen zijn, worden door offsets X impliciet alle offsets kleiner dan X vastgelegd. Daarom is het niet nodig om elke offset afzonderlijk te plegen, en het plegen van meerdere offsets tegelijk, maar alleen de grootste offset.
Let op, het is door het ontwerp ook mogelijk om een kleinere offset te maken dan de laatste gecommitteerde offset. Dit kan worden gedaan als berichten een tweede keer moeten worden gelezen.
Verwerkingsgaranties
Het gebruik van auto commit biedt minstens één keer semantiek. De onderliggende veronderstelling is dat poll() alleen wordt aangeroepen nadat alle eerder afgeleverde berichten met succes zijn verwerkt. Dit zorgt ervoor dat er geen berichten verloren gaan omdat een commit na verwerking plaatsvindt. Als een consument faalt vóór een commit, worden alle berichten na de laatste commit van Kafka ontvangen en opnieuw verwerkt. Deze nieuwe poging kan echter leiden tot duplicaten, omdat sommige berichten van de laatste poll() -oproep zijn verwerkt, maar de fout is opgetreden vlak voor de auto-commit-aanroep.
Als er maximaal één keer semantiek moet worden verwerkt, moet automatisch vastleggen worden uitgeschakeld en moet een handmatige commitSync() direct na poll() worden uitgevoerd. Nadien worden berichten verwerkt. Dit zorgt ervoor dat berichten worden vastgelegd voordat ze worden verwerkt en dus nooit een tweede keer worden gelezen. Natuurlijk kan een bericht verloren gaan in geval van een storing.
Hoe kan ik het onderwerp vanaf het begin lezen
Er zijn meerdere strategieën om een onderwerp vanaf het begin te lezen. Om dat uit te leggen, moeten we eerst begrijpen wat er gebeurt bij het opstarten van de consument. Bij het opstarten van een consument gebeurt het volgende:
- sluit u aan bij de geconfigureerde consumentengroep, die een nieuwe balans activeert en partities toewijst aan de consument
- zoeken naar toegewijde offsets (voor alle partities die aan de consument zijn toegewezen)
- voor alle partities met geldige offset, hervat vanaf deze offset
- voor alle partities met ongeldige offset, stel start-offset in volgens
auto.offset.resetconfiguratieparameter
Start een nieuwe consumentengroep
Als u een onderwerp vanaf het begin wilt verwerken, kunt u eenvoudig een nieuwe consumentengroep starten (bijvoorbeeld een ongebruikte group.id ) en auto.offset.reset = earliest . Omdat er geen gecommitteerde offsets zijn voor een nieuwe groep, wordt automatische offset-reset geactiveerd en wordt het onderwerp vanaf het begin gebruikt. Let op, als je bij het opnieuw opstarten van de consument, als je dezelfde group.id opnieuw gebruikt, het onderwerp niet vanaf het begin opnieuw leest, maar hervat waar het group.id was. Voor deze strategie moet u dus een nieuwe groep group.id Telkens wanneer u een onderwerp vanaf het begin wilt lezen.
Gebruik dezelfde groeps-ID opnieuw
Om te voorkomen dat group.id elke keer dat u een onderwerp vanaf het begin wilt lezen een nieuwe group.id , kunt u auto commit uitschakelen (via enable.auto.commit = false ) voordat u de consument voor het eerst start (met een ongebruikte group.id en instelling auto.offset.reset = earliest ). Bovendien moet u geen offsets handmatig vastleggen. Omdat offsets nooit worden vastgelegd met behulp van deze strategie, zal de consument bij het opnieuw opstarten het onderwerp vanaf het begin opnieuw lezen.
Deze strategie heeft echter twee nadelen:
- het is niet fouttolerant
- groepsherstel werkt niet zoals bedoeld
(1) Omdat offsets nooit worden gecommitteerd, worden een falende en een gestopte consument bij het opnieuw opstarten op dezelfde manier behandeld. In beide gevallen wordt het onderwerp vanaf het begin gebruikt. (2) Omdat compensatie nooit wordt vastgelegd, zullen nieuw toegewezen partities bij het opnieuw in evenwicht brengen vanaf het begin consument zijn.
Daarom werkt deze strategie alleen voor consumentengroepen met één consument en mag deze alleen worden gebruikt voor ontwikkelingsdoeleinden.
Hergebruik dezelfde Groeps-ID en Commit
Als u fouttolerant wilt zijn en / of meerdere consumenten in uw consumentengroep wilt gebruiken, is het verplicht om offsets te plegen. Dus als u een onderwerp vanaf het begin wilt lezen, moet u gecommitteerde offsets manipuleren bij het opstarten van de consument. Hiervoor biedt KafkaConsumer drie methoden seek() , seekToBeginning() en seekToEnd() . Hoewel seek() kan worden gebruikt om een willekeurige offset in te stellen, kunnen de tweede en derde methode worden gebruikt om respectievelijk naar het begin of einde van een partitie te zoeken. Dus bij mislukking en bij herstart zou het zoeken van consumenten worden weggelaten en kan de consument verdergaan waar hij was gebleven. Voor consument-stop-en-herstart-vanaf-begin, zou seekToBeginning() expliciet worden aangeroepen voordat u uw poll() -lus binnengaat. Houd er rekening mee dat seekXXX() alleen kan worden gebruikt nadat een consument lid is geworden van een groep. Daarom is het vereist om een "dummy-poll" uit te voeren voordat seekXXX() . De algemene code zou er ongeveer zo uitzien:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}