apache-kafka
Javaのプロデューサ/コンシューマ
サーチ…
前書き
このトピックでは、Javaでレコードを生成して使用する方法を示します。
SimpleConsumer(Kafka> = 0.9.0)
カフカの0.9リリースは、カフカ消費者の完全な再設計をもたらしました。あなたが古いSimpleConsumer (0.8.X)に興味があるなら、 このページを見てください 。 Kafkaのインストールが0.8.Xより新しい場合、以下のコードがそのまま使用できます。
構成と初期化
Kafka 0.9はJava 6やScala 2.9をサポートしなくなりました。 Java 6を使用している場合は、サポートされているバージョンにアップグレードすることを検討してください。
まず、mavenプロジェクトを作成し、pomに次の依存関係を追加します。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
注 :最新リリースのバージョンフィールドを更新することを忘れないでください(今は> 0.10)。
コンシューマは、 Propertiesオブジェクトを使用して初期化されます。消費者の行動を微調整するための多くのプロパティがあります。以下は必要最小限の設定です:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
bootstrap-serversは、消費者が残りのクラスターを検出できるブローカーの初期リストです。これは、クラスタ内のすべてのサーバである必要はありません。クライアントは、このリストのブローカからアライブブローカのフルセットを判断します。
deserializerは、メッセージキーと値を解釈/逆シリアル化する方法を消費者に通知します。ここでは、組み込みのStringDeserializerを使用しStringDeserializer 。
最後に、 group.idはこのクライアントのコンシューマ・グループに対応します。覚えておいてください:コンシューマ・グループのすべてのコンシューマは、それらの間でメッセージを分割します(カフカはメッセージ・キューのように動作します)。異なるコンシューマ・グループのコンシューマは、同じメッセージ(パブリッシュ・サブスクライブ・システムのように動作するカフカ)を取得します。
その他の有用なプロパティは次のとおりです。
auto.offset.reset:Zookeeperに格納されているオフセットが不足しているか、範囲外である場合のauto.offset.reset制御します。可能な値はlatestとearliestです。それ以外の場合は例外がスローされます。enable.auto.commit:true(デフォルト)の場合、コンシューマオフセットはバックグラウンドで定期的に保存されます(auto.commit.interval.ms参照)。false設定し、auto.offset.reset=earliestを使用すると、コミットされたオフセット情報が見つからない場合に消費者がどこから開始すべきかを判断することができます。earliestは、割り当てられたトピックパーティションの先頭からのことです。latest意味は、パーティションに対して利用可能なコミットされたオフセットの最大数からです。しかし、Kafkaのコンシューマは、有効なオフセットレコードが見つかった場合(つまり、auto.offset.reset無視する)、最後にコミットされたオフセットから再開しauto.offset.reset。新しいコンシューマグループがトピックにサブスクライブする場合が最も良い例です。auto.offset.resetを使用して、トピックの最初(最古)または最後(最新)から開始するかどうかを決定します。session.timeout.ms:セッションタイムアウトは、コンシューマがクラッシュした場合、またはネットワークパーティションがコンシューマからコンシューマを隔離した場合に、ロックが解除されることを保証します。確かに:コンシューマ・グループの一部である場合、各コンシューマには、サブスクライブしているトピックからパーティションのサブセットが割り当てられます。これは基本的にこれらのパーティションのグループロックです。ロックが保持されている限り、グループ内の他のメンバーはそのメンバーから読み取ることができません。あなたの消費者が健康であれば、これはまさにあなたが望むものです。これは、重複した消費を避けることができる唯一の方法です。しかし、マシンやアプリケーションの障害のために消費者が死亡した場合、パーティションを正常なメンバーに割り当てることができるようにロックを解除する必要があります。 ソース
プロパティの完全なリストはhttp://kafka.apache.org/090/documentation.html#newconsumerconfigsにあります。
コンシューマ作成とトピック購読
いったんプロパティを取得すると、コンシューマを簡単に作成できます。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );
サブスクライブした後、コンシューマは残りのグループと調整してパーティション割り当てを取得できます。これは、データの使用を開始すると自動的に処理されます。
基本的な投票
消費者は、多くのブローカーに広がっている可能性のある多くのトピックについて、多くのパーティションから並列にデータをフェッチできる必要があります。幸いにも、これはデータの消費を開始すると自動的に処理されます。これを行うには、ループでpollを呼び出し、消費者が残りを処理するだけです。
pollは、割り当てられたパーティションからのメッセージ(おそらく空の)を返します。
while( true ){
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( !records.isEmpty() ){
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
コード
基本的な例
これは、カフカのトピックからメッセージを取得するために使用できる最も基本的なコードです。
public class ConsumerExample09{
public static void main( String[] args ){
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "auto.offset.reset", "earliest" );
props.put( "enable.auto.commit", "false" );
props.put( "group.id", "octopus" );
try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
consumer.subscribe( Collections.singletonList( "test-topic" ) );
while( true ){
// poll with a 100 ms timeout
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( records.isEmpty() ) continue;
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
}
}
実行可能な例
消費者は、自分のスレッドで実行されるように設計されています。外部同期なしではマルチスレッドでの使用は安全ではありません。試してみるのは良い考えではありません。
以下は、コンシューマを初期化し、トピックのリストにサブスクライブし、外部でシャットダウンするまでポーリングループを無期限に実行する単純なRunnableタスクです。
public class ConsumerLoop implements Runnable{
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop( int id, String groupId, List<String> topics ){
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092");
props.put( "group.id", groupId );
props.put( "auto.offset.reset", "earliest" );
props.put( "key.deserializer", StringDeserializer.class.getName() );
props.put( "value.deserializer", StringDeserializer.class.getName() );
this.consumer = new KafkaConsumer<>( props );
}
@Override
public void run(){
try{
consumer.subscribe( topics );
while( true ){
ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}catch( WakeupException e ){
// ignore for shutdown
}finally{
consumer.close();
}
}
public void shutdown(){
consumer.wakeup();
}
}
ポーリング中はLong.MAX_VALUEタイムアウトを使用するので、新しいメッセージが無期限に待機することに注意してください。コンシューマを適切に閉じるには、アプリケーションを終了する前にshutdown()メソッドを呼び出すことが重要です。
ドライバは次のように使用することができます:
public static void main( String[] args ){
int numConsumers = 3;
String groupId = "octopus";
List<String> topics = Arrays.asList( "test-topic" );
ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
final List<ConsumerLoop> consumers = new ArrayList<>();
for( int i = 0; i < numConsumers; i++ ){
ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
consumers.add( consumer );
executor.submit( consumer );
}
Runtime.getRuntime().addShutdownHook( new Thread(){
@Override
public void run(){
for( ConsumerLoop consumer : consumers ){
consumer.shutdown();
}
executor.shutdown();
try{
executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
}catch( InterruptedException e ){
e.printStackTrace();
}
}
} );
}
SimpleProducer(kafka> = 0.9)
構成と初期化
まず、mavenプロジェクトを作成し、pomに次の依存関係を追加します。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
プロデューサは、 Propertiesオブジェクトを使用して初期化されます。プロデューサーの振る舞いを微調整できる多くのプロパティがあります。以下は必要最小限の設定です:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
bootstrap-serversは、残りのクラスターを検出できるようにする1つ以上のブローカーの初期リストです。 serializerプロパティは、メッセージキーと値をどのようにエンコードするかをKafkaに伝えます。ここでは、文字列メッセージを送信します。必須ではありませんが、いつでもclient.id設定することをお勧めします。これにより、ブローカ上の要求を、それを作成したクライアントインスタンスに簡単に関連付けることができます。
その他の興味深いプロパティは次のとおりです。
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
あなたはacks設定を通してKafkaに書かれたメッセージの耐久性を制御することができます。デフォルト値の "1"は、書き込みが成功したというパーティションリーダーからの明示的な確認を必要とします。 Kafkaが提供する最も強力な保証はacks=allで、パーティションリーダーが書き込みを受け入れるだけでなく、すべての同期レプリカに正常に複製されたことを保証します。 "0"の値を使用してスループットを最大化することもできますが、ブローカはこの場合でも応答を送信しないため、メッセージがブローカのログに正常に書き込まれたという保証はありません。
retries (デフォルトは> 0)は、プロデューサが失敗後に再送を試みるかどうかを決定します。 retries> 0では、次の書き込みが成功した後に再試行が行われる可能性があるため、メッセージの順序が変更されることがあります。
カフカの生産者は、送信されたメッセージをバッチに収集してスループットを向上させようとします。 Javaクライアントでは、 batch.sizeを使用して、各メッセージ・バッチの最大サイズをバイト単位で制御できます。バッチがいっぱいになるまでの時間をlinger.msには、 linger.msを使用してプロデューサに送信遅延を持たせることができます。最後に、compression.typeの設定でcompression.type有効にすることができます。
buffer.memoryを使用して、未送信メッセージを収集するためにJavaクライアントが使用できる合計メモリーを制限します。この制限にmax.block.msと、プロデューサは例外を発生さmax.block.ms前にmax.block.ms間だけ追加の送信をブロックします。さらに、レコードが無期限にキューに入れられないようにするには、 request.timeout.msを使用してタイムアウトを設定します。
プロパティの完全なリストはここで利用できます 。私はConfluentからこの記事を読むことをお勧めします。
メッセージを送信する
send()メソッドは非同期です。呼び出されると、保留中のレコード送信のバッファにレコードが追加され、すぐに返されます。これにより、生産者は効率のために個々の記録をまとめてバッチ処理することができます。
sendの結果はレコードが送られたパーティションとそれが割り当てられたオフセットを指定するRecordMetadataです。送信呼び出しは非同期であるため、このレコードに割り当てられるRecordMetadataのFutureを返します。メタデータを参照するには、 get()呼び出すか、リクエストが完了するかコールバックを使用するまでブロックします。
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
コード
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}