Sök…


parametrar

Parameter Beskrivning
group.id Namnet på konsumentgruppen.
enable.auto.commit Utför automatiskt offset; standard: sant .
auto.commit.interval.ms Den minsta förseningen i millisekunder mellan till åtaganden (kräver enable.auto.commit=true ); standard: 5000 .
auto.offset.reset Vad gör man när det inte finns någon giltig förpliktad offset; standard: senaste . (+)
(+) Möjliga värden Beskrivning
tidigast Återställ offset automatiskt till den tidigaste förskjutningen.
senast Återställ offset automatiskt till den senaste offset.
ingen Kasta undantag till konsumenten om ingen tidigare kompensation hittas för konsumentens grupp.
något annat Kasta undantag till konsumenten.

Vad är en konsumentgrupp

Från och med Kafka 0.9 är den nya höga KafkaConsumer- klienten tillgänglig. Den utnyttjar ett nytt inbyggt Kafka-protokoll som gör det möjligt att kombinera flera konsumenter i en så kallad Consumer Group . En konsumentgrupp kan beskrivas som en enda logisk konsument som prenumererar på en uppsättning ämnen. Partitionerna över alla ämnen överensstämmer med de fysiska konsumenterna i gruppen, så att varje patition tilldelas exaclty en konsument (en enda konsument kan få flera partitoner tilldelade). De enskilda konsumenterna som tillhör samma grupp kan köra på olika värdar på ett distribuerat sätt.

Konsumentgrupper identifieras via deras group.id . För att göra en specifik klientinstans medlem av en konsumentgrupp räcker det att tilldela group.id till denna klient, via klientens konfiguration:

Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);

Således bildar alla konsumenter som ansluter till samma Kafka-kluster och använder samma group.id en konsumentgrupp. Konsumenter kan lämna en grupp när som helst och nya konsumenter kan gå med i en grupp när som helst. I båda fallen utlöses en så kallad rebalance och partitioner överförs med konsumentgruppen för att säkerställa att varje partition behandlas av exakt en konsument inom gruppen.

Var uppmärksam på att även en enda KafkaConsumer bildar en konsumentgrupp med sig själv som ensammedlem.

Konsument Offset Management och Fault-Tolerance

KafkaConsumers begär meddelanden från en Kafka-mäklare via en uppmaning poll() och deras framsteg spåras via offset . Varje meddelande inom varje partition i varje ämne har en så kallad offset tilldelad - dess logiska sekvensnummer inom partitionen. En KafkaConsumer spårar sin nuvarande offset för varje partition som tilldelas den. Var uppmärksam på att Kafka-mäklarna inte känner till konsumenternas nuvarande kompensation. På poll() måste således konsumenten skicka sina nuvarande offset till mäklaren, så att mäklaren kan returnera motsvarande meddelanden, dvs. meddelanden med större offset i följd. Låt oss till exempel anta att vi har ett enda partitionsämne och en enda konsument med nuvarande offset 5. Vid poll() skickar konsumenten om offset till mäklaren och mäklaren returnerar meddelanden för offset 6,7,8, ...

Eftersom konsumenterna spårar sina kompensationer själva kan denna information gå vilse om en konsument misslyckas. Därför måste förskjutningar lagras pålitligt så att konsumenten vid omstart kan hämta sin gamla offset och resumerera där den var kvar. I Kafka finns det inbyggt stöd för detta via offsetåtaganden . Den nya KafkaConsumer kan begå sin nuvarande offset till Kafka och Kafka lagrar dessa förskjutningar i ett speciellt ämne som heter __consumer_offsets . Att lagra kompensationerna inom ett Kafka-ämne är inte bara feltoleranta, utan tillåter också tilldela partitioner till andra konsumenter under en rebalans också. Eftersom alla konsumenter i en konsumentgrupp kan få åtkomst till alla begåvade förskjutningar av alla partitioner, vid återbalansering, läser en konsument som får en ny partition tilldelad bara den engagerade kompensationen av denna partition från __consumer_offsets ämne och återupptas där den gamla konsumenten kvar.

Hur man begår offset

KafkaConsumers kan göra offset automatiskt i bakgrunden (konfigurationsparameteren enable.auto.commit = true ) vad som är standardinställningen. De automatiska åtagandena görs inom poll() ( som vanligtvis kallas i en slinga ). Hur ofta offset ska begås kan konfigureras via auto.commit.interval.ms . Eftersom auto-commits är inbäddade i poll() och poll() kallas av användarkoden, definierar denna parameter en undre gräns för inter-commit-intervallet.

Som ett alternativ till automatisk åtagande kan förskjutningar också hanteras manuellt. För detta bör enable.auto.commit = false inaktiveras ( enable.auto.commit = false ). För manuell KafkaConsumers erbjuder KafkaConsumers två metoder, nämligen commitSync () och commitAsync () . Som namnet indikerar är commitSync() ett blockerande samtal som kommer tillbaka efter att offset har begått framgångsrikt, medan commitAsync() återgår omedelbart. Om du vill veta om ett åtagande lyckades eller inte, kan du tillhandahålla en OffsetCommitCallback ( OffsetCommitCallback ) en OffsetCommitCallback . Var uppmärksam på att konsumenten i båda begär samtal gör offset för den senaste samtalet poll() . Till exempel. låt oss anta ett enda partitionsämne med en enda konsument och det sista samtalet poll() returnerar meddelanden med offset 4,5,6. Vid engagemang kommer offset 6 att begås eftersom detta är den senaste offset som spåras av konsumentklienten. Samtidigt commitSync() både commitSync() och commitAsync() mer kontroll över vilken förskjutning du vill begå: om du använder motsvarande överbelastningar som gör att du kan ange en Map<TopicPartition, OffsetAndMetadata> kommer konsumenten endast att begå de angivna offseterna (dvs. kartan kan innehålla valfri delmängd av tilldelade partitioner, och den angivna offseten kan ha valfritt värde).

Semantik av engagerade offset

En begiven offset indikerar att alla meddelanden upp till denna offset redan har behandlats. Eftersom förskjutningar är på varandra följande antal begår offset X implicit alla förskjutningar som är mindre än X Därför är det inte nödvändigt att begå varje förskjutning individuellt och att utföra flera förskjutningar samtidigt, händer utan bara göra den största förskjutningen.

Var uppmärksam på att det genom design också är möjligt att begå en mindre offset än den senast begagnade förskjutningen. Detta kan göras om meddelanden ska läsas en gång till.

Bearbetar garantier

Att använda auto commit ger minst en gång behandlingssemantik. Det underliggande antagandet är att poll() bara anropas efter att alla tidigare levererade meddelanden har behandlats framgångsrikt. Detta säkerställer att inget meddelande går vilse, eftersom ett åtagande inträffar efter bearbetning. Om en konsument misslyckas före ett åtagande, mottas alla meddelanden efter det senaste åtagandet från Kafka och behandlas igen. Detta försök kan emellertid resultera i dubbletter, eftersom vissa meddelanden från det senaste samtalet poll() -samtalet kan ha behandlats men felet inträffade precis innan det automatiska åtagandesamtalet.

Om högst en gång bearbetningssemantik krävs, måste commitSync() inaktiveras och en manuell commitSync() direkt efter poll() bör göras. Efteråt behandlas meddelanden. Detta säkerställer att meddelanden begås innan de behandlas och därför aldrig läses en andra gång. Naturligtvis kan vissa meddelanden gå vilse i fall av misslyckande.

Hur kan jag läsa ämnet från början

Det finns flera strategier för att läsa ett ämne från början. För att förklara dem måste vi först förstå vad som händer vid konsumentstart. Vid uppstart av en konsument händer följande:

  1. gå med i den konfigurerade konsumentgruppen, som utlöser en rebalans och tilldelar partitioner till konsumenten
  2. leta efter begåvade kompensationer (för alla partitioner som tilldelats konsumenten)
  3. för alla partitioner med giltig offset, fortsätt från denna offset
  4. för alla partitioner som inte har giltig offset, ställ in startförskjutning enligt parametern auto.offset.reset

Starta en ny konsumentgrupp

Om du vill bearbeta ett ämne från början kan du enkelt starta en ny konsumentgrupp (dvs. välja en oanvänd group.id ) och ställa auto.offset.reset = earliest . Eftersom det inte finns några offsetförskjutningar för en ny grupp, kommer automatisk offset-återställning att utlösas och ämnet kommer att konsumeras från början. Var uppmärksam på att om du använder samma group.id nytt om du använder samma group.id kommer det inte att läsa ämnet från början igen, men fortsätta där det var kvar. För denna strategi måste du alltså tilldela en ny group.id varje gång du vill läsa ett ämne från början.

Återanvänd samma grupp-ID

För att undvika att ställa in en ny group.id varje gång du vill läsa ett ämne från början, kan du inaktivera auto commit (via enable.auto.commit = false ) innan du startar konsumenten för första gången (med en oanvänd group.id och inställning auto.offset.reset = earliest ). Dessutom bör du inte göra några kompensationer manuellt. Eftersom offset aldrig begås med denna strategi kommer konsumenten vid omstart att läsa ämnet från början igen.

Emellertid har denna strategi två nackdelar:

  1. det är inte feltolerant
  2. grupprebalans fungerar inte som avsett

(1) Eftersom offset aldrig begås, hanteras en misslyckad och en stoppad konsument på samma sätt vid omstart. I båda fallen kommer ämnet att konsumeras från början. (2) Eftersom förskjutning aldrig begås, kommer nytilldelade partitioner att återupprätta konsumtion från början.

Därför fungerar denna strategi endast för konsumentgrupper med en enda konsument och bör endast användas för utvecklingsändamål.

Återanvänd samma grupp-ID och åtagande

Om du vill vara feltolerant och / eller använda flera konsumenter i din konsumentgrupp är det obligatoriskt att utföra kompensationer. Således, om du vill läsa ett ämne från början, måste du manipulera engagerade offset vid konsumentstart. För detta ger KafkaConsumer tre metoder seek() , seekToBeginning() och seekToEnd() . Medan seek() kan användas för att ställa in en godtycklig offset, kan den andra och tredje metoden användas för att söka till början eller slutet av en partition. Således, om misslyckande och om konsumentens omstart skulle sökande utelämnas och konsumenten kan återuppta där den lämnade. För konsument-stop-and-start-från-början, seekToBeginning() skulle kallas uttryckligen innan du går in i din poll() -slinga. Observera att seekXXX() endast kan användas efter att en konsument har anslutit sig till en grupp - det krävs därför en "dummy-poll" innan du använder seekXXX() . Den övergripande koden skulle vara något liknande:

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow