Поиск…


Вступление

В этом разделе показано, как создавать и использовать записи на Java.

SimpleConsumer (Kafka> = 0.9.0)

В выпуске «Кафка», представленном в 0.9 году, был внесен полный редизайн потребителя кафки. Если вас интересует старый SimpleConsumer (0.8.X), ознакомьтесь с этой страницей . Если ваша установка Kafka новее 0.8.X, следующие коды должны работать из коробки.

Конфигурация и инициализация

Kafka 0.9 больше не поддерживает Java 6 или Scala 2.9. Если вы все еще находитесь на Java 6, рассмотрите возможность обновления до поддерживаемой версии.

Во-первых, создайте проект maven и добавьте следующую зависимость в ваш pom:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

Примечание . Не забудьте обновить поле версии для последних версий (теперь> 0.10).

Пользователь инициализируется с использованием объекта Properties . Существует множество свойств, позволяющих точно настроить поведение потребителей. Ниже приведена минимальная конфигурация:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers являются исходным списком брокеров для того, чтобы потребитель мог обнаружить остальную часть кластера. Это не обязательно должно быть все серверы в кластере: клиент будет определять полный набор живых брокеров от брокеров в этом списке.

deserializer сообщает потребителю, как интерпретировать / десериализовать ключи и значения сообщений. Здесь мы используем встроенный StringDeserializer .

Наконец, group.id соответствует группе потребителей этого клиента. Помните: все потребители группы потребителей будут разделять между собой сообщения (kafka действует как очередь сообщений), в то время как потребители из разных групп потребителей получат одинаковые сообщения (kafka действует как система публикации-подписки).

Другими полезными свойствами являются:

  • auto.offset.reset : контролирует, что делать, если смещение, хранящееся в Zookeeper, либо отсутствует, либо выходит за пределы диапазона. Возможные значения - latest и earliest . Все остальное выкинет исключение;

  • enable.auto.commit : если true (по умолчанию), потребительское смещение периодически (см. auto.commit.interval.ms ) сохраняется в фоновом режиме. Установка его в значение false и использование auto.offset.reset=earliest - это определить, откуда должен начинать потребитель, если не будет обнаружена auto.offset.reset=earliest либо зафиксированная информация о смещении. earliest с начала назначенного раздела раздела. latest средства с самого большого количества доступных фиксированных смещений для раздела. Тем не менее, потребитель Kafka всегда будет возвращаться из последнего зафиксированного смещения, если будет найдена действительная запись смещения (т.е. игнорирование auto.offset.reset . Лучшим примером является то, что новая группа потребителей подписывается на тему. auto.offset.reset чтобы определить, следует ли начинать с самого начала (самое раннее) или с конца (последнего) темы.

  • session.timeout.ms : таймаут сеанса гарантирует, что блокировка будет выпущена, если потребитель сработает или если сетевой раздел изолирует пользователя от координатора. В самом деле:

    Когда часть группы потребителей, каждому потребителю назначается подмножество разделов из тем, на которые он подписал. Это в основном групповой замок на этих разделах. Пока блокировка удерживается, никакие другие члены группы не смогут прочитать их. Когда ваш потребитель здоров, это именно то, что вы хотите. Это единственный способ избежать дублирования потребления. Но если потребитель умирает из-за сбоя машины или приложения, вам нужно, чтобы блокировка была выпущена, чтобы разделы могли быть назначены здоровому участнику. источник

Полный список свойств доступен здесь http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Потребительское творчество

Когда у нас есть свойства, создание потребителя легко:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

После того, как вы подписаны, потребитель может координировать свою работу с остальной группой, чтобы получить назначение раздела. Все это обрабатывается автоматически, когда вы начинаете потреблять данные.

Основной опрос

Потребитель должен иметь возможность получать данные параллельно, потенциально из многих разделов для многих тем, которые, вероятно, распространяются на многих брокеров. К счастью, все это обрабатывается автоматически, когда вы начинаете потреблять данные. Для этого все, что вам нужно сделать, это poll вызовов в цикле, и потребитель обращается с остальными.

poll возвращает (возможно, пустой) набор сообщений из назначенных разделов.

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

Код

Основной пример

Это самый простой код, который вы можете использовать для извлечения сообщений из темы kafka.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

Пример Runnable

Потребитель предназначен для запуска в своей собственной нити. Это небезопасно для многопоточного использования без внешней синхронизации, и, вероятно, это не очень хорошая идея.

Ниже приведена простая задача Runnable, которая инициализирует пользователя, подписывается на список тем и выполняет цикл опроса неограниченно до выключения извне.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

Обратите внимание, что во время опроса мы используем тайм-аут Long.MAX_VALUE , поэтому он будет ждать бесконечно для нового сообщения. Чтобы правильно закрыть пользователя, важно вызвать его метод shutdown() перед завершением работы приложения.

Драйвер может использовать его следующим образом:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0,9)

Конфигурация и инициализация

Во-первых, создайте проект maven и добавьте следующую зависимость в ваш pom:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

Проигрыватель инициализируется с использованием объекта Properties . Существует множество свойств, позволяющих точно настроить поведение производителя. Ниже приведена минимальная конфигурация:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers являются исходным списком одного или нескольких брокеров, чтобы производитель мог обнаружить остальную часть кластера. Свойства serializer сообщают Kafka, что ключ и значение сообщения должны быть закодированы. Здесь мы отправим строковые сообщения. Хотя это и не требуется, всегда рекомендуется установить client.id так как это позволяет вам легко сопоставлять запросы с брокером с экземпляром клиента, который его создал.

Другие интересные свойства:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

Вы можете контролировать долговечность сообщений, написанных на Kafka, через настройку acks . Значение по умолчанию «1» требует явного подтверждения от лидера раздела, что запись выполнена. Самая сильная гарантия, которую предоставляет Kafka, - это acks=all , что гарантирует, что лидер разделов не только принимал запись, но и был успешно реплицирован во все синхронизированные реплики. Вы также можете использовать значение «0», чтобы максимизировать пропускную способность, но у вас не будет гарантии, что сообщение было успешно записано в журнал брокера, поскольку брокер даже не отправляет ответ в этом случае.

retries (по умолчанию -> 0) определяет, пытается ли продюсер повторно отправить сообщение после сбоя. Обратите внимание, что при повторных попытках> 0 может возникнуть переупорядочение сообщений, поскольку повторная попытка может произойти после того, как будет выполнена следующая запись.

Производители Kafka пытаются собрать отправленные сообщения в партии для повышения пропускной способности. С помощью клиента Java вы можете использовать batch.size для управления максимальным размером в байтах каждой партии сообщений. Чтобы дать больше времени для заполнения партий, вы можете использовать linger.ms для отправки задержки производителя. Наконец, сжатие можно включить с помощью параметра compression.type .

Используйте buffer.memory чтобы ограничить общую память, доступную для клиента Java, для сбора неотправленных сообщений. Когда этот предел будет max.block.ms , продюсер будет блокировать дополнительные max.block.ms до тех пор, пока max.block.ms не будет использоваться для создания исключения. Кроме того, чтобы избежать ведение записей в очереди на неопределенный срок, вы можете установить таймаут, используя request.timeout.ms .

Полный список свойств доступен здесь . Я предлагаю прочитать эту статью из Confluent для более подробной информации.

Отправка сообщений

Метод send() является асинхронным. При вызове он добавляет запись в буфер отложенных сообщений и сразу возвращается. Это позволяет производителю объединить отдельные записи для повышения эффективности.

Результатом отправки является RecordMetadata указывающая раздел, на который была отправлена ​​запись, и смещение, которое было назначено. Поскольку вызов отправки асинхронен, он возвращает Future для RecordMetadata, которое будет присвоено этой записи. Чтобы проконсультироваться с метаданными, вы можете либо вызвать get() , который будет блокироваться до тех пор, пока запрос не завершится или не будет использовать обратный вызов.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

Код

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow