수색…


소개

이 주제에서는 Java로 레코드를 생성하고 사용하는 방법을 보여줍니다.

SimpleConsumer (Kafka> = 0.9.0)

카프카 0.9 버전은 카프카 소비자를 완전히 재 설계했습니다. 오래된 SimpleConsumer (0.8.X)에 관심이 있으시면 이 페이지를보십시오 . Kafka 설치가 0.8.X보다 새로운 경우, 다음 코드가 즉시 작동합니다.

구성 및 초기화

Kafka 0.9는 Java 6 또는 Scala 2.9를 더 이상 지원하지 않습니다. 자바 6을 사용 중이라면 지원되는 버전으로 업그레이드하는 것이 좋습니다.

먼저, maven 프로젝트를 만들고 pom에 다음 종속성을 추가하십시오.

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

참고 : 최신 릴리스의 버전 필드를 업데이트하는 것을 잊지 마십시오 (현재 0.10 이상).

소비자는 Properties 객체를 사용하여 초기화됩니다. 소비자 행동을 미세 조정할 수있는 속성이 많이 있습니다. 다음은 필요한 최소 구성입니다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers 는 소비자가 나머지 클러스터를 검색 할 수 있도록하는 브로커의 초기 목록입니다. 이것은 클러스터의 모든 서버가 될 필요는 없습니다. 클라이언트는이 목록의 브로커에서 살아있는 브로커의 전체 세트를 판별합니다.

deserializer 는 소비자에게 메시지 키와 값을 해석 / 역 직렬화하는 방법을 알려줍니다. 여기서 우리는 내장 StringDeserializer 사용합니다.

마지막으로, group.id 는이 클라이언트의 소비자 그룹에 해당합니다. 기억하십시오. 소비자 그룹의 모든 소비자는 서로 다른 소비자 그룹의 소비자가 같은 메시지 (발행 - 구독 시스템처럼 행동하는 카프카)를 얻지 만 메시지 그룹 사이에서 메시지를 분리합니다.

기타 유용한 속성은 다음과 같습니다.

  • auto.offset.reset : 사육사에 저장된 오프셋이 누락되었거나 범위를 벗어난 경우 수행 할 작업을 제어합니다. 가능한 값은 latest 값과 earliest 값입니다. 다른 것은 예외를 던질 것이다.

  • enable.auto.commit : true (기본값)이면 소비자 오프셋은 백그라운드에 저장된 주기적으로 ( auto.commit.interval.ms 참조)입니다. false 설정하고 auto.offset.reset=earliest 사용 auto.offset.reset=earliest 커밋 된 오프셋 정보가없는 경우 소비자가 어디에서 시작해야하는지 결정할 수 있습니다. earliest 것은 할당 된 주제 파티션의 시작부터의 평균값입니다. latest 은 파티션에 대해 사용 가능한 확약 된 최대 오프셋 수로부터의 평균을의 L합니다. 그러나 Kafka 소비자는 유효한 오프셋 레코드가 발견되는 한 (즉, auto.offset.reset 무시하는 한) 마지막 커밋 된 오프셋에서부터 재개 할 것입니다. 가장 좋은 예는 새로운 소비자 그룹이 주제에 가입하는 경우입니다. auto.offset.reset 을 사용하여 주제의 시작 (가장 이른) 또는 끝 (최신) 중에서 시작 할지를 결정합니다.

  • session.timeout.ms : 세션 타임 아웃은 소비자가 충돌하거나 네트워크 파티션이 컨디셔너와 소비자를 분리 할 경우 잠금이 해제되도록합니다. 과연:

    소비자 그룹의 일부인 경우 각 소비자는 구독 한 주제에서 파티션의 서브 세트를 할당받습니다. 이것은 기본적으로 해당 파티션에 대한 그룹 잠금입니다. 자물쇠가 열리는 동안 그룹의 다른 구성원은 그 구성원을 읽을 수 없습니다. 당신의 소비자가 건강 할 때, 이것은 당신이 원하는 것입니다. 중복 소비를 피할 수있는 유일한 방법입니다. 그러나 컴퓨터 또는 응용 프로그램 오류로 인해 소비자가 사망하는 경우 파티션을 정상적인 구성원에게 할당 할 수 있도록 잠금을 해제해야합니다. 출처

전체 속성 목록은 http://kafka.apache.org/090/documentation.html#newconsumerconfigs에서 볼 수 있습니다.

소비자 창작 및 주제 구독

일단 우리가 속성을 갖게되면 소비자를 만드는 것이 쉽습니다.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

가입 한 후 소비자는 나머지 그룹과 조정하여 파티션 할당을받을 수 있습니다. 이것은 데이터를 소비하기 시작할 때 자동으로 처리됩니다.

기본 설문 조사

소비자는 많은 브로커에 걸쳐 퍼질 가능성이있는 많은 주제의 많은 파티션에서 잠재적으로 병렬로 데이터를 가져올 수 있어야합니다. 다행히도 데이터를 소비하기 시작할 때 자동으로 처리됩니다. 그렇게하기 위해, 당신이해야 할 일은 반복적으로 poll 을 호출하고 소비자가 나머지를 처리하는 것입니다.

poll 은 할당 된 파티션에서 메시지 집합을 반환합니다 (비어있는 경우도 있음).

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

코드

기본 예제

이것은 카프카 (kafka) 주제에서 메시지를 가져 오는 데 사용할 수있는 가장 기본적인 코드입니다.

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

실행 가능한 예제

소비자는 자체 스레드로 실행되도록 설계되었습니다. 외부 동기화없이 다중 스레드 사용에는 안전하지 않으므로 시도하지 않는 것이 좋습니다.

아래는 소비자를 초기화하고, 주제 목록을 구독하고, 외부 적으로 종료 할 때까지 무한정 폴링 루프를 실행하는 간단한 Runnable 작업입니다.

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }


    public void shutdown(){
        consumer.wakeup();
    }
}

폴링 중에는 Long.MAX_VALUE 의 타임 아웃을 사용하므로 새로운 메시지가 무기한 대기됩니다. 소비자를 제대로 닫으려면 응용 프로그램을 종료하기 전에 shutdown() 메서드를 호출하는 것이 중요합니다.

드라이버는 다음과 같이 사용할 수 있습니다.

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}

SimpleProducer (kafka> = 0.9)

구성 및 초기화

먼저, maven 프로젝트를 만들고 pom에 다음 종속성을 추가하십시오.

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

생성자는 Properties 객체를 사용하여 초기화됩니다. 생산자 행동을 미세 조정할 수있는 속성이 많이 있습니다. 다음은 필요한 최소 구성입니다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers 는 제작자가 나머지 클러스터를 검색 할 수있는 하나 이상의 브로커의 초기 목록입니다. serializer 속성은 Kafka에게 메시지 키와 값을 인코딩하는 방법을 알려줍니다. 여기서 우리는 문자열 메시지를 보낼 것입니다. 필수는 아니지만 항상 client.id 설정하는 것이 항상 권장됩니다. 이렇게하면 브로커의 요청을 클라이언트 인스턴스와 쉽게 상관시킬 수 있습니다.

다른 흥미로운 속성은 다음과 같습니다.

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

acks 설정을 통해 Kafka에 작성된 메시지내구성을 제어 할 수 있습니다. 기본값 "1"은 파티션 리더에서 쓰기가 성공했음을 명시 적으로 확인해야합니다. 카프카가 제공하는 가장 강력한 보장은 acks=all . 파티션 리더가 쓰기를 허용 할뿐만 아니라 모든 동기화 된 복제본에 성공적으로 복제되었습니다. "0"값을 사용하여 처리량을 최대화 할 수도 있지만 브로커가이 경우에도 응답을 전송하지 않기 때문에 메시지가 브로커의 로그에 성공적으로 기록되었다고 보장 할 수 없습니다.

retries (기본값> 0)는 제작자가 실패한 후에 메시지를 다시 보내려고하는지 결정합니다. 재 시도가 0보다 큰 경우, 다음 쓰기가 성공한 후에 재 시도가 발생할 수 있기 때문에 메시지 순서 재 지정이 발생할 수 있습니다.

카프카 제작자는 처리량을 높이기 위해 보낸 메시지를 일괄 적으로 수집하려고합니다. Java 클라이언트를 사용하면 batch.size 를 사용하여 각 메시지 일괄 처리의 최대 크기 (바이트)를 제어 할 수 있습니다. 배치를 채우기 위해 더 많은 시간을주기 위해 linger.ms 를 사용하여 생산자가 전송을 지연하게 할 수 있습니다. 마지막으로 compression.type 설정을 사용하여 compression.type 활성화 할 수 있습니다.

buffer.memory 를 사용하여 보내지 않은 메시지를 수집하기 위해 Java 클라이언트에서 사용할 수있는 전체 메모리를 제한하십시오. 이 한계에 도달하면 제작자는 예외를 발생 max.block.ms 전에 max.block.ms 만큼 추가 전송을 차단합니다. 또한 레코드를 무한정 큐에 보관하지 않으려면 request.timeout.ms 사용하여 시간 제한을 설정할 수 있습니다.

전체 속성 목록은 여기에서 볼 수 있습니다 . 자세한 내용은 Confluent 에서이 기사 를 읽으십시오.

메시지 보내기

send() 메소드는 비동기식입니다. 호출되면 보류중인 레코드 전송 버퍼에 레코드를 추가하고 즉시 반환합니다. 이를 통해 생산자는 효율성을 위해 개별 레코드를 함께 배치 할 수 있습니다.

send의 결과는 레코드가 보내진 파티션과 할당 된 오프셋을 지정하는 RecordMetadata 입니다. send 호출이 비동기 적이므로이 레코드에 할당 될 RecordMetadata의 Future 를 반환합니다. 메타 데이터를 참조하려면 요청이 완료되거나 콜백을 사용할 때까지 차단할 get() 호출 할 수 있습니다.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

코드

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow