Ricerca…


Parametri

Parametro Descrizione
group.id Il nome del gruppo di consumatori.
enable.auto.commit Commettere automaticamente offset; impostazione predefinita: true .
auto.commit.interval.ms Il ritardo minimo in millisecondi tra le commit (richiede enable.auto.commit=true ); impostazione predefinita: 5000 .
auto.offset.reset Cosa fare quando non è stato trovato alcun offset valido commesso; default: ultimo . (+)
(+) Valori possibili Descrizione
più presto Ripristina automaticamente l'offset sul primo offset.
più recente Ripristina automaticamente l'offset sull'offset più recente.
nessuna Lanciare un'eccezione al consumatore se non viene trovato nessun offset precedente per il gruppo del consumatore.
qualunque altra cosa Lanciare un'eccezione al consumatore.

Cos'è un gruppo di consumatori

A partire da Kafka 0.9, è disponibile il nuovo client di alto livello di KafkaConsumer . Sfrutta un nuovo protocollo Kafka integrato che consente di unire più utenti in un cosiddetto Consumer Group . Un gruppo di consumatori può essere descritto come un singolo consumatore logico che si iscrive a una serie di argomenti. Le partions su tutti gli argomenti sono assegnate ai consumatori fisici all'interno del gruppo, in modo tale che ogni patition sia assegnata a un utente esatto (un singolo consumatore può ottenere più partitoni assegnati). I singoli consumatori appartenenti allo stesso gruppo possono essere eseguiti su host diversi in modo distribuito.

I gruppi di consumatori sono identificati tramite il loro group.id . Per creare un membro di istanza client specifico di un gruppo di consumatori, è sufficiente assegnare i gruppi group.id a questo client, tramite la configurazione del client:

Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);

Pertanto, tutti i consumatori che si connettono allo stesso cluster Kafka e utilizzano lo stesso group.id formano un gruppo di consumatori. I consumatori possono lasciare un gruppo in qualsiasi momento e i nuovi consumatori possono unirsi a un gruppo in qualsiasi momento. In entrambi i casi, viene attivato un cosiddetto ribilanciamento e le partizioni vengono riassegnate al Consumer Group per garantire che ciascuna partizione venga elaborata da un solo consumatore all'interno del gruppo.

Presta attenzione, che anche un singolo KafkaConsumer forma un gruppo di consumatori con se stesso come membro singolo.

Gestione degli offset dei clienti e tolleranza agli errori

I clienti di Kafka richiedono i messaggi da un broker Kafka tramite una chiamata al poll() e il loro avanzamento viene tracciato tramite gli offset . Ogni messaggio all'interno di ogni partizione di ogni argomento, ha un cosiddetto offset assegnato, il suo numero di sequenza logica all'interno della partizione. Un KafkaConsumer tiene traccia del suo attuale offset per ogni partizione che gli viene assegnata. Presta attenzione, che i broker di Kafka non sono a conoscenza delle attuali compensazioni dei consumatori. Pertanto, nel poll() il consumatore deve inviare le sue correzioni correnti al broker, in modo tale che il broker possa restituire i messaggi corrispondenti, cioè ,. messaggi con un offset consecutivo maggiore. Ad esempio, supponiamo di avere un argomento con partizione singola e un singolo utente con offset corrente 5. Nel poll() il consumatore invia se offset al broker e il broker restituisce i messaggi per gli offset 6,7,8, ...

Poiché i consumatori tracciano da soli i propri offset, queste informazioni potrebbero andare perse se un consumatore fallisce. Pertanto, gli offset devono essere memorizzati in modo affidabile, in modo tale che al riavvio, un utente possa prelevare il suo vecchio offset e resumer da dove è rimasto. In Kafka, c'è un supporto integrato per questo tramite i commit di offset . Il nuovo KafkaConsumer può KafkaConsumer suo attuale offset a Kafka e Kafka memorizza tali offset in un argomento speciale chiamato __consumer_offsets . La memorizzazione degli offset all'interno di un argomento di Kafka non è solo tollerante ai guasti, ma consente anche di riassegnare le partizioni ad altri utenti durante un ribilanciamento. Poiché tutti i consumatori di un gruppo di consumatori possono accedere a tutte le correzioni impegnate di tutte le partizioni, in caso di ribilanciamento, un utente a cui viene assegnata una nuova partizione legge l'offset impegnato di questa partizione __consumer_offsets e riprende da dove era rimasto il vecchio utente.

Come commettere offset

KafkaConsumers può eseguire automaticamente il commit degli offset in background (parametro di configurazione enable.auto.commit = true ) qual è l'impostazione predefinita. Questi commit automatici vengono eseguiti all'interno di poll() ( che in genere viene chiamato in un ciclo ). La frequenza con cui devono essere commessi gli offset, può essere configurata tramite auto.commit.interval.ms . Poiché i commit automatici sono incorporati in poll() e poll() viene chiamato dal codice utente, questo parametro definisce un limite inferiore dell'inter-commit-interval.

In alternativa al commit automatico, gli offset possono anche essere gestiti manualmente. Per questo, il commit automatico dovrebbe essere disabilitato ( enable.auto.commit = false ). Per il commit manuale KafkaConsumers offre due metodi, ovvero commitSync () e commitAsync () . Come indica il nome, commitSync() è una chiamata bloccante, che ritorna dopo che gli offset sono stati commessi correttamente, mentre commitAsync() restituisce immediatamente. Se vuoi sapere se un commit ha avuto successo o no, puoi fornire un gestore di chiamata ( OffsetCommitCallback ) un parametro di metodo. Prestare attenzione, che in entrambe le chiamate di commit, il consumatore commette gli scostamenti dall'ultima chiamata poll() . Per esempio. supponiamo che un argomento di singola partizione con un singolo consumatore e l'ultima chiamata al poll() restituisca i messaggi con offset 4,5,6. In caso di commit, l'offset 6 verrà eseguito perché questo è l'ultimo offset tracciato dal cliente consumatore. Allo stesso tempo, sia commitSync() che commitAsync() consentono un maggiore controllo su quale offset si desidera eseguire il commit: se si utilizzano gli overload corrispondenti che consentono di specificare una Map<TopicPartition, OffsetAndMetadata> il consumer impegna solo gli offset specificati (cioè, la mappa può contenere qualsiasi sottoinsieme di partizioni assegnate e l'offset specificato può avere qualsiasi valore).

Semantica di compensazioni commesse

Un offset impegnato indica che tutti i messaggi fino a questo offset sono già stati elaborati. Pertanto, poiché gli offset sono numeri consecutivi, l'offset X commette implicitamente tutti gli offset inferiori a X Pertanto, non è necessario eseguire il commit di ciascun offset singolarmente e commettere più offset contemporaneamente, ma commettere solo l'offset maggiore.

Prestare attenzione, che in base alla progettazione è anche possibile impegnare un offset minore rispetto all'ultimo offset impegnato. Questo può essere fatto, se i messaggi dovrebbero essere letti una seconda volta.

Garanzie di elaborazione

L'uso del commit automatico fornisce una semantica di elaborazione almeno una volta. L'ipotesi sottostante è che poll() viene chiamato solo dopo che tutti i messaggi consegnati in precedenza sono stati elaborati correttamente. Ciò garantisce che nessun messaggio venga perso perché un commit avviene dopo l' elaborazione. Se un utente fallisce prima di un commit, tutti i messaggi successivi all'ultimo commit vengono ricevuti da Kafka e processati nuovamente. Tuttavia, questo nuovo tentativo potrebbe comportare duplicati, poiché potrebbe essere stato elaborato un messaggio dall'ultima chiamata poll() , ma l'errore si è verificato subito prima della chiamata di commit automatico.

Se sono richieste semantiche di elaborazione al massimo una volta, il commit automatico deve essere disabilitato e deve essere commitSync() manualmente un commitSync() direttamente dopo il poll() . In seguito, i messaggi vengono elaborati. Ciò garantisce che i messaggi vengano inoltrati prima che vengano elaborati e quindi non vengano mai letti una seconda volta. Ovviamente, alcuni messaggi potrebbero andare persi in caso di errore.

Come posso leggere l'argomento dall'inizio

Ci sono più strategie per leggere un argomento dall'inizio. Per spiegarli, dobbiamo prima capire cosa succede all'avvio dei consumatori. All'avvio di un consumatore, accade quanto segue:

  1. aderire al gruppo di consumatori configurato, che attiva un ribilanciamento e assegna le partizioni al consumatore
  2. cerca offset compensati (per tutte le partizioni che sono state assegnate al consumatore)
  3. per tutte le partizioni con offset valido, riprendere da questo offset
  4. per tutte le partizioni con offset non valido, impostare l'offset iniziale in base al parametro di configurazione auto.offset.reset

Inizia un nuovo gruppo di consumatori

Se si desidera elaborare un argomento dall'inizio, è possibile avviare semplicemente un nuovo gruppo di consumatori (ad esempio, selezionare un group.id inutilizzato) e impostare auto.offset.reset = earliest . Poiché non ci sono offset assegnati per un nuovo gruppo, verrà attivato il ripristino dell'offset automatico e l'argomento verrà utilizzato dall'inizio. Fai attenzione, che al riavvio del consumatore, se usi di nuovo lo stesso group.id , non leggerà l'argomento dall'inizio, ma riprenderà da dove è rimasto. Pertanto, per questa strategia, dovrai assegnare un nuovo group.id ogni volta che desideri leggere un argomento dall'inizio.

Riutilizzare lo stesso ID di gruppo

Per evitare di impostare un nuovo group.id ogni volta che si desidera leggere un argomento dall'inizio, è possibile disabilitare il commit automatico (tramite enable.auto.commit = false ) prima di avviare il consumer per la prima volta (utilizzando un group.id non utilizzato group.id e impostazione auto.offset.reset = earliest ). Inoltre, non si dovrebbe commettere manualmente alcuna compensazione. Poiché gli offset non vengono mai commessi utilizzando questa strategia, al riavvio il consumatore leggerà nuovamente l'argomento dall'inizio.

Tuttavia, questa strategia ha due svantaggi:

  1. non è tollerante ai guasti
  2. il riequilibrio di gruppo non funziona come previsto

(1) Poiché gli offset non vengono mai commessi, un utente in errore e uno arrestato vengono gestiti allo stesso modo al riavvio. In entrambi i casi, l'argomento verrà consumato dal suo inizio. (2) Poiché l'offset non viene mai eseguito, sul ribilanciamento le partizioni appena assegnate saranno consumer fin dall'inizio.

Pertanto, questa strategia funziona solo per gruppi di consumatori con un singolo consumatore e deve essere utilizzata solo a fini di sviluppo.

Riutilizza lo stesso ID di gruppo e conferma

Se si desidera essere tolleranti ai guasti e / o utilizzare più utenti nel proprio gruppo di consumatori, è obbligatorio eseguire l'offset. Pertanto, se si desidera leggere un argomento sin dall'inizio, è necessario modificare gli offset commessi all'avvio del consumer. Per questo, KafkaConsumer fornisce tre metodi seek() , seekToBeginning() e seekToEnd() . Mentre seek() può essere usato per impostare un offset arbitrario, il secondo e il terzo metodo possono essere utilizzati rispettivamente per cercare l'inizio o la fine di una partizione. Pertanto, in caso di fallimento e di ricerca al riavvio del consumatore verrebbe omessa e il consumatore può riprendere da dove è rimasto. Per il consumer-stop-and-restart-from-beginning, seekToBeginning() verrebbe chiamato esplicitamente prima di inserire il ciclo di poll() . Nota che seekXXX() può essere utilizzato solo dopo che un utente è entrato in un gruppo, quindi è necessario eseguire un "dummy-poll" prima di utilizzare seekXXX() . Il codice generale sarebbe qualcosa del genere:

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow