apache-kafka
Productor / Consumidor en Java
Buscar..
Introducción
Este tema muestra cómo producir y consumir registros en Java.
SimpleConsumer (Kafka> = 0.9.0)
La versión 0.9 de Kafka introdujo un rediseño completo del consumidor kafka. Si está interesado en el antiguo SimpleConsumer (0.8.X), eche un vistazo a esta página . Si su instalación de Kafka es más reciente que 0.8.X, los siguientes códigos deberían funcionar de manera inmediata.
Configuración e inicialización
Kafka 0.9 ya no es compatible con Java 6 o Scala 2.9. Si aún está en Java 6, considere actualizar a una versión compatible.
Primero, cree un proyecto de Maven y agregue la siguiente dependencia en su pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
Nota : no olvide actualizar el campo de versión para las últimas versiones (ahora> 0.10).
El consumidor se inicializa utilizando un objeto Properties . Hay muchas propiedades que le permiten ajustar el comportamiento del consumidor. A continuación se muestra la configuración mínima necesaria:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Los bootstrap-servers son una lista inicial de agentes para que el consumidor pueda descubrir el resto del clúster. No es necesario que sean todos los servidores del clúster: el cliente determinará el conjunto completo de intermediarios activos de los intermediarios en esta lista.
El deserializer le dice al consumidor cómo interpretar / deserializar las claves y valores del mensaje. Aquí, usamos el StringDeserializer .
Finalmente, el group.id corresponde al grupo de consumidores de este cliente. Recuerde: todos los consumidores de un grupo de consumidores dividirán los mensajes entre ellos (kafka actuando como una cola de mensajes), mientras que los consumidores de diferentes grupos de consumidores recibirán los mismos mensajes (kafka actuando como un sistema de publicación-suscripción).
Otras propiedades útiles son:
auto.offset.reset: controla qué hacer si el desplazamiento almacenado en Zookeeper falta o está fuera de rango. Los valores posibles son loslatesty losearliest. Cualquier otra cosa lanzará una excepción;enable.auto.commit: si estrue(predeterminado), el desplazamiento del consumidor seauto.commit.interval.msperiódicamente (verauto.commit.interval.ms) guardado en el fondo. Establecerlo enfalsey usarauto.offset.reset=earliest- es para determinar desde dónde debe comenzar el consumidor en caso de que no se encuentre información de compensación confirmada.earliestmedios desde el inicio de la partición del tema asignado.latestmedios del mayor número de compensaciones comprometidas disponibles para la partición. Sin embargo, el consumidor de Kafka siempre se reanudará desde el último desplazamiento confirmado siempre que se encuentre un registro de desplazamiento válido (es decir, ignorandoauto.offset.reset. El mejor ejemplo es cuando un grupo de consumidores nuevo se suscribe a un tema. Esto es cuando se usaauto.offset.resetpara determinar si comenzar desde el principio (más temprano) o el final (más reciente) del tema.session.timeout.ms: un tiempo de espera de sesión asegura que el bloqueo se liberará si el consumidor se bloquea o si una partición de red aísla al consumidor del coordinador. En efecto:Cuando forman parte de un grupo de consumidores, a cada consumidor se le asigna un subconjunto de las particiones de los temas a los que se ha suscrito. Esto es básicamente un bloqueo de grupo en esas particiones. Mientras se mantenga el bloqueo, ningún otro miembro del grupo podrá leer de ellos. Cuando su consumidor está sano, esto es exactamente lo que quiere. Es la única forma de evitar el consumo duplicado. Pero si el consumidor muere debido a una falla de la máquina o la aplicación, necesita que se libere ese bloqueo para que las particiones puedan asignarse a un miembro sano. fuente
La lista completa de propiedades está disponible aquí http://kafka.apache.org/090/documentation.html#newconsumerconfigs .
Creación del consumidor y suscripción al tema.
Una vez que tenemos las propiedades, crear un consumidor es fácil:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );
Una vez que se haya suscrito, el consumidor puede coordinar con el resto del grupo para obtener su asignación de partición. Todo esto se maneja automáticamente cuando empiezas a consumir datos.
Encuesta basica
El consumidor debe poder obtener datos en paralelo, potencialmente de muchas particiones para muchos temas que probablemente se distribuyen entre muchos corredores. Afortunadamente, todo esto se maneja automáticamente cuando comienza a consumir datos. Para hacer eso, todo lo que necesita hacer es llamar a la poll en un bucle y el consumidor se encarga del resto.
poll devuelve un conjunto (posiblemente vacío) de mensajes de las particiones que fueron asignadas.
while( true ){
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( !records.isEmpty() ){
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
El código
Ejemplo basico
Este es el código más básico que puede usar para obtener mensajes de un tema kafka.
public class ConsumerExample09{
public static void main( String[] args ){
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "auto.offset.reset", "earliest" );
props.put( "enable.auto.commit", "false" );
props.put( "group.id", "octopus" );
try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
consumer.subscribe( Collections.singletonList( "test-topic" ) );
while( true ){
// poll with a 100 ms timeout
ConsumerRecords<String, String> records = consumer.poll( 100 );
if( records.isEmpty() ) continue;
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}
}
}
Ejemplo ejecutable
El consumidor está diseñado para ejecutarse en su propio hilo. No es seguro para uso multiproceso sin sincronización externa y probablemente no sea una buena idea intentarlo.
A continuación se muestra una tarea sencilla de Ejecutar que inicializa al consumidor, se suscribe a una lista de temas y ejecuta el bucle de sondeo indefinidamente hasta que se apaga externamente.
public class ConsumerLoop implements Runnable{
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop( int id, String groupId, List<String> topics ){
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092");
props.put( "group.id", groupId );
props.put( "auto.offset.reset", "earliest" );
props.put( "key.deserializer", StringDeserializer.class.getName() );
props.put( "value.deserializer", StringDeserializer.class.getName() );
this.consumer = new KafkaConsumer<>( props );
}
@Override
public void run(){
try{
consumer.subscribe( topics );
while( true ){
ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
}
}catch( WakeupException e ){
// ignore for shutdown
}finally{
consumer.close();
}
}
public void shutdown(){
consumer.wakeup();
}
}
Tenga en cuenta que usamos un tiempo de espera de Long.MAX_VALUE durante la encuesta, por lo que esperará indefinidamente un mensaje nuevo. Para cerrar correctamente el consumidor, es importante llamar a su método shutdown() antes de finalizar la aplicación.
Un conductor podría usarlo así:
public static void main( String[] args ){
int numConsumers = 3;
String groupId = "octopus";
List<String> topics = Arrays.asList( "test-topic" );
ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
final List<ConsumerLoop> consumers = new ArrayList<>();
for( int i = 0; i < numConsumers; i++ ){
ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
consumers.add( consumer );
executor.submit( consumer );
}
Runtime.getRuntime().addShutdownHook( new Thread(){
@Override
public void run(){
for( ConsumerLoop consumer : consumers ){
consumer.shutdown();
}
executor.shutdown();
try{
executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
}catch( InterruptedException e ){
e.printStackTrace();
}
}
} );
}
SimpleProducer (kafka> = 0.9)
Configuración e inicialización
Primero, cree un proyecto de Maven y agregue la siguiente dependencia en su pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
El productor se inicializa utilizando un objeto Properties . Hay muchas propiedades que le permiten ajustar el comportamiento del productor. A continuación se muestra la configuración mínima necesaria:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
Los bootstrap-servers son una lista inicial de uno o más intermediarios para que el productor pueda descubrir el resto del clúster. Las propiedades del serializer dicen a Kafka cómo deben codificarse la clave y el valor del mensaje. Aquí, le enviaremos mensajes de cadena. Aunque no es obligatorio, siempre se recomienda establecer un client.id : esto le permite correlacionar fácilmente las solicitudes en el agente con la instancia del cliente que lo creó.
Otras propiedades interesantes son:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
Puede controlar la durabilidad de los mensajes escritos en Kafka a través de la configuración de acks . El valor predeterminado de "1" requiere un reconocimiento explícito del líder de la partición de que la escritura se realizó correctamente. La garantía más acks=all que ofrece Kafka es con acks=all , lo que garantiza que el líder de la partición no solo aceptó la escritura, sino que se replicó con éxito en todas las réplicas sincronizadas. También puede usar un valor de "0" para maximizar el rendimiento, pero no tendrá ninguna garantía de que el mensaje se haya escrito correctamente en el registro del agente, ya que el agente ni siquiera envía una respuesta en este caso.
retries (predeterminados a> 0) determinan si el productor intenta reenviar el mensaje después de una falla. Tenga en cuenta que con los reintentos> 0, la reordenación de mensajes puede ocurrir ya que el reintento puede ocurrir después de que una escritura siguiente haya tenido éxito.
Los productores de Kafka intentan recopilar los mensajes enviados en lotes para mejorar el rendimiento. Con el cliente Java, puede usar batch.size para controlar el tamaño máximo en bytes de cada lote de mensajes. Para dar más tiempo para que se llenen los lotes, puede usar linger.ms para que el productor retrase el envío. Finalmente, la compresión se puede habilitar con la configuración de tipo de compression.type .
Use buffer.memory para limitar la memoria total que está disponible para el cliente Java para recopilar mensajes no enviados. Cuando se max.block.ms este límite, el productor bloqueará los envíos adicionales durante el tiempo max.block.ms antes de generar una excepción. Además, para evitar que los registros se pongan en cola indefinidamente, puede establecer un tiempo de espera con request.timeout.ms .
La lista completa de propiedades está disponible aquí . Sugiero leer este artículo de Confluent para más detalles.
Enviando mensajes
El método send() es asíncrono. Cuando se llama, agrega el registro a un búfer de envíos de registros pendientes y se devuelve inmediatamente. Esto permite al productor agrupar registros individuales para la eficiencia.
El resultado del envío es un RecordMetadata especifica la partición a la que se envió el registro y el desplazamiento que se le asignó. Dado que la llamada de envío es asíncrona, devuelve un Future para los Datos de Registro que se asignarán a este registro. Para consultar los metadatos, puede llamar a get() , que se bloqueará hasta completar la solicitud o usar una devolución de llamada.
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
El código
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}