apache-kafka
消費者グループとオフセット管理
サーチ…
パラメーター
| パラメータ | 説明 |
|---|---|
| group.id | コンシューマー・グループの名前。 |
| enable.auto.commit | 自動的にオフセットをコミットします。 デフォルト:true 。 |
| auto.commit.interval.ms | コミットするまでの最小遅延(ミリ秒)( enable.auto.commit=trueが必要)。 デフォルトは5000です。 |
| auto.offset.reset | 有効なコミットされたオフセットが見つからない場合の対処方法。 デフォルト:最新 (+) |
| (+)可能な値 | 説明 |
| 早い | 最も早いオフセットに自動的にオフセットをリセットします。 |
| 最新 | オフセットを最新のオフセットに自動的にリセットします。 |
| 無し | コンシューマのグループに対して以前のオフセットが見つからない場合、コンシューマに例外をスローします。 |
| 他に何か | 消費者に例外を投げる。 |
消費者グループとは
Kafka 0.9以降、新しい高水準KafkaConsumerクライアントが利用可能です。これは、いわゆるコンシューマー・グループで複数のコンシューマーを組み合わせることができる新しい組み込みカフカプロトコルを利用しています。コンシューマ・グループは、一連のトピックにサブスクライブする単一の論理コンシューマとして記述できます。すべてのトピックに関する部分は、グループ内の物理的な消費者に割り当てられ、各パテントはある消費者(単一の消費者が複数のパートを割り当てることができる)に割り当てられる。同じグループに属する個々の消費者は、分散して異なるホスト上で実行できます。
コンシューマー・グループはgroup.id識別されます。コンシューマ・グループの特定のクライアント・インスタンス・メンバーを作成するには、クライアントの構成を使用してgroup.idグループをこのクライアントに割り当てるだけで十分です。
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
したがって、同じカフカクラスタに接続し、同じgroup.idを使用するすべてのコンシューマがコンシューマグループを形成します。消費者はいつでもグループを離れることができ、新しい消費者はいつでもグループに加わることができます。どちらの場合も、いわゆる再調整がトリガーされ、パーティションがコンシューマーグループに再割り当てされ、各パーティションがグループ内のある消費者によって確実に処理されます。
単一のKafkaConsumerであっても、それ自体が単一のメンバーである消費者グループを形成していることに注意してください。
消費者オフセット管理とフォールトトレランス
KafkaConsumersは、 poll()呼び出しを介してKafkaブローカーからのメッセージを要求し、その進行状況はオフセットを介して追跡されます。各トピックの各パーティション内の各メッセージには、オフセットが割り当てられています。つまり、パーティション内にその論理シーケンス番号が割り当てられています。 KafkaConsumerは、割り当てられている各パーティションの現在のオフセットを追跡します。カフカのブローカーは、現在の消費者のオフセットを認識していないことに注意してください。したがって、 poll() 、消費者は現在のオフセットをブローカに送信して、ブローカが対応するメッセージを返すことができるようにする必要があります。より大きな連続オフセットを持つメッセージ。たとえば、単一のパーティショントピックと現在のオフセット5を持つ単一のコンシューマがあると仮定します。poll poll() 、コンシューマはオフセットをブローカに送信し、ブローカはオフセット6,7,8、...
消費者は自分自身でオフセットを追跡するので、消費者が失敗するとこの情報が失われる可能性があります。したがって、オフセットは確実に格納されなければならず、再起動すると、消費者はその古いオフセットとリサウンサを拾うことができるようになる。カフカには、 オフセットコミットを介した組み込みのサポートがあります。新しいKafkaConsumerは現在のオフセットをKafkaにコミットし、Kafkaはそのオフセットを__consumer_offsetsという特別なトピックに__consumer_offsetsます。 Kafkaのトピックにオフセットを格納するのは、フォールトトレラントなだけでなく、再バランス中に他のコンシューマにパーティションを再割り当てすることもできます。コンシューマ・グループのすべてのコンシューマはすべてのパーティションのすべてのコミット済みオフセットにアクセスできるため、 __consumer_offsets新しいパーティションを割り当てられたコンシューマは、このパーティションのコミットされたオフセットを__consumer_offsetsトピックから読み込み、古いコンシューマが残した場所から再開します。
オフセットをコミットする方法
KafkaConsumersは、バックグラウンドで自動的にオフセットをコミットできます(設定パラメータenable.auto.commit = true )。デフォルト設定は何enable.auto.commit = trueこれらの自動コミットはpoll() ( 通常はループで呼ばれます )内poll()実行されます。オフセットをコミットする頻度は、 auto.commit.interval.msで設定できます。自動コミットはpoll()組み込まれ、 poll()はユーザーコードによって呼び出されるため、このパラメーターはコミット間インターバルの下限を定義します。
自動コミットの代わりに、オフセットを手動で管理することもできます。そのためには、自動コミットを無効にする必要があります( enable.auto.commit = false )。手動コミットの場合、 KafkaConsumersはcommitSync()とcommitAsync()という2つのメソッドがあります。名前が示すように、 commitSync()はブロッキング呼び出しであり、オフセットが正常にコミットされた後に戻り、 commitAsync()はすぐに戻ります。コミットが成功したかどうかを知りたい場合は、コールバックハンドラ( OffsetCommitCallback )にメソッドパラメータを指定できます。どちらのコミット呼び出しでも、最新のpoll()呼び出しのオフセットをコミットすることに注意してください。例えば。単一のコンシューマを持つ単一パーティションのトピックを想定し、 poll()最後の呼び出しはオフセット4,5,6を持つメッセージを返します。コミット時には、コンシューマクライアントによって追跡される最新のオフセットであるため、オフセット6がコミットされます。同時にcommitSync()とcommitAsync()両方で、コミットするオフセットをより詳細に制御できます。 Map<TopicPartition, OffsetAndMetadata>を指定できる対応するオーバーロードを使用すると、コンシューマは指定されたオフセットのみをコミットします(マップには割り当てられたパーティションのサブセットが含まれていてもよく、指定されたオフセットには任意の値を設定できます)。
コミットされたオフセットのセマンティクス
コミットされたオフセットは、このオフセットまでのすべてのメッセージが既に処理されたことを示します。したがって、オフセットは連続した数値なので、オフセットXコミットすると、 Xより小さいすべてのオフセットが暗黙的にコミットされます。したがって、個々のオフセットを個別にコミットする必要はなく、一度に複数のオフセットをコミットするだけで、最大のオフセットをコミットすることができます。
設計上、最後にコミットされたオフセットより小さなオフセットをコミットすることも可能であることに注意してください。これは、メッセージをもう一度読み込む必要がある場合に実行できます。
処理保証
自動コミットを使用すると、少なくとも一回処理のセマンティクスが提供されます。根底にある仮定は、以前に配信されたすべてのメッセージが正常に処理された後にのみpoll()が呼び出されるということです。これにより、処理後にコミットが発生するため、メッセージが失われることはありません。コミット前にコンシューマが失敗すると、最後のコミット後のすべてのメッセージがKafkaから受信され、再度処理されます。ただし、最後のpoll()呼び出しからのメッセージが処理された可能性がありますが、自動コミット呼び出しの直前にエラーが発生したため、この再試行で重複が発生する可能性があります。
一度に処理するセマンティクスが必要な場合は、自動コミットを無効にして、 poll()直後に手動でcommitSync()実行する必要があります。その後、メッセージが処理されます。これにより、メッセージが処理される前にコミットされるので、2度目の読み取りは行われません。もちろん、失敗した場合にメッセージが失われる可能性があります。
どのように私は最初からトピックを読むことができますか
トピックを最初から読み込むには、複数の戦略があります。それらを説明するには、まず消費者の立ち上げ時に何が起こるのかを理解する必要があります。消費者の立ち上げ時に、以下のことが起こります:
- 構成されたコンシューマ・グループに参加すると、リバランスがトリガーされ、コンシューマにパーティションが割り当てられます。
- コミットされたオフセットを探します(コンシューマに割り当てられたすべてのパーティション)
- 有効なオフセットを持つすべてのパーティションで、このオフセットから再開する
- 有効なオフセットでないすべてのパーティションに対して、
auto.offset.reset設定パラメータに従って開始オフセットを設定する
新しいコンシューマーグループを開始する
最初からトピックを処理したい場合は、新しいコンシューマグループを開始する(つまり、未使用のgroup.id選択する)だけで、 auto.offset.reset = earliestを設定することができます。新しいグループにコミットされたオフセットはないため、自動オフセットリセットがトリガーされ、トピックは最初から消費されます。コンシューマーの再起動時に、同じgroup.id再度使用すると、最初からトピックを読み取ることはなく、残した場所から再開することに注意してください。したがって、この戦略では、最初からトピックを読み込むたびに新しいgroup.idを割り当てる必要があります。
同じグループIDを再利用する
最初からトピックを読み取るたびに新しいgroup.id設定しないようにするには、初めて(unused group.idを使用して)コンシューマーを開始する前にenable.auto.commit = falseを使用して自動コミットを無効にすることができますgroup.idとauto.offset.reset = earliestを設定します)。さらに、オフセットを手動でコミットしないでください。この戦略を使用してオフセットは決してコミットされないため、コンシューマは再起動時にトピックを最初から再度読み込みます。
しかし、この戦略には2つの欠点があります。
- フォールトトレラントではありません
- グループの再バランスが意図したとおりに機能しない
(1)オフセットは決してコミットされないため、失敗したコンシューマと停止したコンシューマは、再起動時に同じ方法で処理されます。どちらの場合も、トピックは最初から消費されます。 (2)オフセットは決してコミットされないので、リバランスに新たに割り当てられたパーティションは最初から消費者になります。
したがって、この戦略は、単一のコンシューマを持つコンシューマ・グループに対してのみ機能し、開発目的でのみ使用する必要があります。
同じグループIDとコミットを再利用する
フォールトトレラントで、コンシューマ・グループで複数のコンシューマを使用したい場合は、オフセットをコミットする必要があります。したがって、最初からトピックを読みたい場合は、コンシューマの起動時にコミットされたオフセットを操作する必要があります。このために、 KafkaConsumerはseek() 、 seekToBeginning() 、およびseekToEnd() 3つのメソッドseek()提供します。 seek()を使用して任意のオフセットを設定することができますが、2番目と3番目の方法はそれぞれパーティションの先頭または末尾まで検索するために使用できます。したがって、失敗した場合および消費者の再開時には、シークは省略され、消費者はその出発地から再開することができる。コンシューマの停止から再開までの最初の段階では、 poll()ループを開始する前にseekToBeginning()が明示的に呼び出されます。 seekXXX()は、コンシューマがグループに参加した後でなければ使用できないため、 seekXXX()を使用する前に "ダミーポール"を実行する必要があります。全体的なコードは次のようなものになります。
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}