apache-kafka
소비자 그룹 및 오프셋 관리
수색…
매개 변수
| 매개 변수 | 기술 |
|---|---|
| group.id | 소비자 그룹의 이름. |
| enable.auto.commit | 자동으로 오프셋을 커밋합니다. 기본값 : true . |
| auto.commit.interval.ms | 커밋 사이의 최소 지연 시간 (밀리 초 단위) ( enable.auto.commit=true 필요); 기본값 : 5000 . |
| auto.offset.reset | 유효한 커밋 된 오프셋이 발견되지 않을 때 수행 할 작업; 기본값 : 최신 (+) |
| (+) 가능한 값 | 기술 |
| 가장 이른 | 오프셋을 가장 빠른 오프셋으로 자동 재설정합니다. |
| 최근 | 오프셋을 최신 오프셋으로 자동 재설정합니다. |
| 없음 | 소비자 그룹에 대해 이전 오프셋이 발견되지 않으면 소비자에게 예외를 던집니다. |
| 다른 것 | 소비자에게 예외를 던집니다. |
소비자 그룹이란 무엇인가?
Kafka 0.9부터 새로운 고급 KafkaConsumer 클라이언트가 사용 가능합니다. 이 제품은 소위 소비자 그룹 ( Consumer Group) 에서 여러 소비자를 결합 할 수있는 새로운 내장형 Kafka 프로토콜을 이용 합니다. 소비자 그룹은 일련의 주제를 구독하는 단일 논리적 소비자로 설명 될 수 있습니다. 모든 주제에 대한 부분은 그룹 내의 실제 소비자에게 양보되며, 각 특허는 하나의 소비자에게 할당됩니다 (단일 소비자가 여러 명의 개인을 배정받을 수 있음). 동일한 그룹에 속하는 개별 소비자는 분산 된 방식으로 다른 호스트에서 실행될 수 있습니다.
소비자 그룹은 group.id 를 통해 식별됩니다. 컨슈머 그룹의 특정 클라이언트 인스턴스 멤버를 만들려면 클라이언트 구성을 통해 group.id 그룹을이 클라이언트에 할당하면 충분합니다.
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
따라서 동일한 카프카 클러스터에 연결하고 동일한 group.id 사용하는 모든 소비자는 소비자 그룹을 형성합니다. 소비자는 언제든지 그룹을 탈퇴 할 수 있으며 새로운 소비자는 언제든지 그룹에 가입 할 수 있습니다. 두 경우 모두 소위 재조정 이 트리거되고 파티션이 소비자 그룹과 재 할당되어 각 파티션이 그룹 내의 한 명의 소비자에 의해 처리되도록합니다.
단 하나의 KafkaConsumer 조차도 단일 회원으로 구성된 소비자 그룹을 형성한다는 점에 KafkaConsumer .
소비자 오프셋 관리 및 내결함성
KafkaConsumers 는 poll() 호출을 통해 Kafka 브로커의 메시지를 요청하고 그 진행 상황을 오프셋을 통해 추적 합니다 . 각 토픽의 각 파티션 내의 각 메시지에는 파티션 내에 소위 오프셋 (offset)이 할당되어 있습니다. KafkaConsumer 는 할당 된 각 파티션의 현재 오프셋을 추적합니다. 카프카 브로커는 소비자의 현재 오프셋을 알지 못한다는 점에 유의하십시오. 따라서 poll() 에서 소비자는 현재 오프셋을 브로커에 보내야 브로커가 해당 메시지 즉,. 더 큰 연속적인 오프셋을 갖는 메시지. 예를 들어, 단일 파티션 주제와 현재 오프셋이 5 인 단일 소비자가 있다고 가정 해 봅시다. poll() 에서 소비자는 브로커 오프셋과 브로커가 오프셋 6,7,8, ...에 대한 메시지를 반환하면 보냅니다.
소비자가 오프셋을 추적하기 때문에 소비자가 실패하면이 정보가 손실 될 수 있습니다. 따라서 오프셋은 안정적으로 저장되어야합니다. 다시 시작할 때 소비자는 이전 오프셋과 상수를 가져올 수 있습니다. Kafka에는 오프셋 커밋을 통해 내장 된 지원 기능이 있습니다. 새로운 KafkaConsumer 는 현재 오프셋을 Kafka에 위탁 할 수 있으며 Kafka는 __consumer_offsets 라는 특수한 주제에 이러한 오프셋을 저장합니다. 카프카 (Kafka) 주제에 오프셋을 저장하는 것은 내결함성뿐만 아니라 재조정 도중에 파티션을 다른 소비자에게 재 할당하는 것을 허용합니다. 컨슈머 그룹의 모든 소비자는 모든 파티션의 모든 커밋 된 오프셋에 액세스 할 수 있기 때문에 재조정에 따라 할당 된 새 파티션을 얻는 소비자는 __consumer_offsets 항목에서이 파티션의 커밋 된 오프셋을 읽고 이전 소비자가 있던 곳에서 다시 시작합니다.
오프셋을 적용하는 방법
KafkaConsumers 는 백그라운드에서 자동으로 오프셋을 적용 할 수 있습니다 (구성 매개 변수 enable.auto.commit = true ). 기본 설정은 무엇입니까? 이러한 자동 커밋은 poll() ( 일반적으로 루프에서 호출 됨 poll() 내에서 수행됩니다. 오프셋을 커밋해야하는 빈도는 auto.commit.interval.ms 를 통해 구성 할 수 있습니다. 자동 커밋은 poll() 되고 poll() 은 사용자 코드에 의해 호출되기 때문에이 매개 변수는 인터 커밋 간격에 대한 하한을 정의합니다.
자동 커밋의 대안으로 오프셋을 수동으로 관리 할 수도 있습니다. 이 경우 자동 커밋을 사용하지 않도록 설정해야합니다 ( enable.auto.commit = false ). 수동 커밋의 경우 KafkaConsumers 는 commitSync () 및 commitAsync ()의 두 가지 메서드를 제공합니다. 이름에서 알 수 있듯이 commitSync() 는 차단 호출이며 오프셋이 성공적으로 커밋 된 후 반환되며 commitAsync() 는 즉시 반환됩니다. 커밋이 성공했는지 여부를 알고 싶으면 콜백 처리기 ( OffsetCommitCallback ) 메서드 매개 변수를 제공 할 수 있습니다. 두 커밋 호출 모두에서 소비자는 최신 poll() 호출의 오프셋을 커밋합니다. 예를 들어. 단일 소비자가있는 단일 파티션 주제를 가정하고 poll() 대한 마지막 호출은 오프셋 4,5,6이있는 메시지를 반환합니다. 커밋시 소비자 클라이언트가 추적 한 최신 오프셋이므로 오프셋 6이 커밋됩니다. 동시에 commitSync() 및 commitAsync() 는 커밋 할 오프셋을 더 많이 제어 할 수 있습니다. Map<TopicPartition, OffsetAndMetadata> 을 지정할 수있는 해당 오버로드를 사용하면 소비자는 지정된 오프셋 만 커밋합니다 (즉, 맵은 할당 된 파티션의 하위 세트를 포함 할 수 있으며 지정된 오프셋은 임의의 값을 가질 수 있습니다).
커밋 된 오프셋의 의미
커밋 된 오프셋은이 오프셋까지의 모든 메시지가 이미 처리되었음을 나타냅니다. 따라서 오프셋은 연속 번호이므로 커밋 오프셋 X 는 X 보다 작은 모든 오프셋을 암시 적으로 커밋합니다. 따라서 각 오프셋을 개별적으로 커밋 할 필요가 없으며 한 번에 여러 오프셋을 커밋하는 것이 가장 큰 오프셋을 커밋하지만 발생합니다.
디자인에 따르면 마지막 커밋 된 오프셋보다 작은 오프셋을 커밋 할 수도 있습니다. 메시지를 다시 읽어야 할 경우이 작업을 수행 할 수 있습니다.
가공 보증
자동 커밋을 사용하면 적어도 한 번 처리하는 의미를 제공합니다. poll() 은 이전에 전달 된 모든 메시지가 성공적으로 처리 된 후에 만 호출됩니다. 이렇게하면 처리 후에 커밋이 발생하기 때문에 메시지가 손실되지 않습니다. 커밋 전에 소비자가 실패하면 마지막 커밋 이후의 모든 메시지가 Kafka에서 수신되어 다시 처리됩니다. 그러나이 재시도는 마지막 poll() 호출의 일부 메시지가 처리되었지만 자동 커밋 호출 직전에 오류가 발생하여 중복이 발생할 수 있습니다.
at-most-once 처리 의미가 필요한 경우 자동 커밋을 비활성화하고 poll() 후에 직접 commitSync() 수행해야합니다. 그 다음에 메시지가 처리됩니다. 이렇게하면 메시지가 처리 되기 전에 커밋되어 두 번째로 읽히지 않습니다. 물론 오류가 발생하면 일부 메시지가 손실 될 수 있습니다.
주제를 처음부터 어떻게 읽을 수 있습니까?
주제를 처음부터 읽는 여러 전략이 있습니다. 이를 설명하기 위해, 우리는 소비자 시동시 어떤 일이 발생하는지 먼저 이해해야합니다. 소비자가 시작할 때 다음과 같은 일이 발생합니다.
- 구성된 소비자 그룹에 참여하여 재조정을 시작하고 소비자에게 파티션을 할당합니다.
- 커밋 된 오프셋 (소비자에게 할당 된 모든 파티션)을 찾습니다.
- 유효한 오프셋이있는 모든 파티션에 대해이 오프셋에서 다시 시작
- 유효한 오프셋이없는 모든 파티션의 경우
auto.offset.reset구성 매개 변수에 따라 시작 오프셋을 설정하십시오.
새로운 소비자 그룹 창립
처음부터 주제를 처리하려는 경우 새 소비자 그룹을 시작 (즉, 사용하지 않은 group.id 선택)하고 auto.offset.reset = earliest 설정할 수 있습니다. 새 그룹에 커밋 된 오프셋이 없으므로 자동 오프셋 재설정이 트리거되고 주제는 처음부터 사용됩니다. 소비자 재시작시 동일한 group.id 다시 사용하면 다시 처음부터 주제를 읽지 않고 group.id 다시 시작한다는 점에 group.id 하십시오. 따라서이 전략을 사용하려면 처음부터 주제를 읽을 때마다 새 group.id 를 할당해야합니다.
동일한 그룹 ID 재사용
처음부터 주제를 읽으 group.id 할 때마다 새 group.id 설정하지 않으려면 사용하지 않은 group.id 사용하여 처음으로 소비자를 시작하기 전에 enable.auto.commit = false 를 통해 자동 커밋을 비활성화 할 수 있습니다 group.id 및 auto.offset.reset = earliest 설정). 또한 수동으로 오프셋을 적용해서는 안됩니다. 이 전략을 사용하면 오프셋을 적용하지 않으므로 다시 시작할 때 소비자는 처음부터 주제를 읽을 수 있습니다.
그러나이 전략에는 다음과 같은 두 가지 단점이 있습니다.
- 내결함성이 아니다.
- 그룹 재조정이 의도 한대로 작동하지 않습니다.
(1) 오프셋은 절대로 커밋되지 않으므로 실패한 소비자와 중지 된 소비자는 재시작시 동일한 방식으로 처리됩니다. 두 경우 모두 주제가 처음부터 사용됩니다. (2) 오프셋은 절대로 커밋되지 않으므로 재 할당시 새로 할당 된 파티션은 처음부터 소비자가됩니다.
따라서이 전략은 단일 소비자가있는 소비자 그룹에만 적용되며 개발 목적으로 만 사용해야합니다.
동일한 그룹 ID 및 커밋 재사용
내결함성을 유지하고 소비자 그룹에서 여러 소비자를 사용하려면 커밋 오프셋이 필수입니다. 따라서 처음부터 주제를 읽으려면 소비자 시작시 커밋 된 오프셋을 조작해야합니다. 이를 위해 KafkaConsumer 는 seek() , seekToBeginning() 및 seekToEnd() 세 가지 메소드를 제공합니다. seek() 은 임의의 오프셋을 설정하는 데 사용할 수 있지만 두 번째 및 세 번째 방법은 각각 파티션의 시작 또는 끝을 찾는 데 사용할 수 있습니다. 따라서, 실패시 및 소비자 재시작시에 검색은 생략되고 소비자는 그것이 남겨진 곳에서 재개 할 수있다. 소비자가 stop-and-restart를 시작할 때 poll() 루프를 시작하기 전에 seekToBeginning() 이 명시 적으로 호출됩니다. seekXXX() 는 소비자가 그룹에 가입 한 후에 만 사용할 수 있으므로 seekXXX() 를 사용하기 전에 "더미 폴"을 수행해야합니다. 전체 코드는 다음과 같습니다.
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}