apache-kafka
Группы потребителей и управление смещением
Поиск…
параметры
| параметр | Описание |
|---|---|
| group.id | Название группы потребителей. |
| enable.auto.commit | Автоматически совершать смещения; default: true . |
| auto.commit.interval.ms | Минимальная задержка в миллисекундах между enable.auto.commit=true (требуется enable.auto.commit=true ); default: 5000 . |
| auto.offset.reset | Что делать, если не найдено допустимого зафиксированного смещения; default: последний . (+) |
| (+) Возможные значения | Описание |
| раннее | Автоматически сбросить смещение до самого раннего смещения. |
| самый последний | Автоматически сбросить смещение до последнего смещения. |
| никто | Выбросьте исключение для потребителя, если для группы потребителей не найдено предыдущего смещения. |
| что-нибудь еще | Выбросьте исключение для потребителя. |
Что такое группа потребителей
Начиная с Kafka 0.9, новый клиент высокого уровня KafkaConsumer доступен . Он использует новый встроенный протокол Kafka, который позволяет объединить несколько потребителей в так называемую группу потребителей . Группа потребителей может быть описана как единый логический потребитель, который подписывается на набор тем. Частицы по всем темам относятся к физическим потребителям внутри группы, так что каждое утверждение назначается для исключения одного потребителя (один потребитель может получить несколько назначенных партитонов). Индивидуальные потребители, принадлежащие к одной группе, могут работать на разных хостах распределенным образом.
Группы потребителей идентифицируются через их group.id . Чтобы создать конкретного члена экземпляра клиента в группе пользователей, достаточно назначить группе group.id этому клиенту через конфигурацию клиента:
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
Таким образом, все потребители, которые подключаются к одному кластеру Kafka и используют один и тот же group.id образуют группу потребителей. Потребители могут покинуть группу в любое время, и новые пользователи могут присоединиться к группе в любое время. В обоих случаях инициируется так называемое перебалансирование, и разделы перераспределяются с помощью группы потребителей, чтобы гарантировать, что каждый раздел обрабатывается одним пользователем из группы.
Обратите внимание, что даже один KafkaConsumer формирует Группу Потребителей с собой как единый член.
Управление смещением потребителей и отказоустойчивость
KafkaConsumers запрашивает сообщения от брокера Kafka по вызову poll() и их прогресс отслеживается посредством смещений . Каждое сообщение в каждом разделе каждой темы имеет так называемое смещение - его номер логической последовательности внутри раздела. KafkaConsumer отслеживает текущее смещение для каждого раздела, который ему назначен. Обратите внимание, что брокеры Kafka не знают о текущих смещениях потребителей. Таким образом, на poll() потребитель должен отправить свои текущие смещения брокеру, чтобы брокер мог вернуть соответствующие сообщения, т. Е. сообщений с большим последовательным смещением. Например, предположим, что у нас есть одна тема раздела и один потребитель с текущим смещением 5. В poll() потребитель отправляет, если смещение брокера и брокер возвращают сообщения для смещений 6,7,8, ...
Поскольку потребители сами отслеживают свои смещения, эта информация может потеряться, если потребитель терпит неудачу. Таким образом, смещения должны быть надежно сохранены, так что при перезагрузке потребитель может забрать свое прежнее смещение и возобновить, где он остался. В Kafka есть встроенная поддержка для этого посредством смещения . Новый KafkaConsumer может передать свое текущее смещение Kafka, и Kafka сохраняет эти смещения в специальной теме под названием __consumer_offsets . Хранение смещений в рамках темы Kafka не просто отказоустойчиво, но позволяет переназначить разделы другим потребителям во время перебалансировки. Поскольку все потребители Группы потребителей могут получить доступ ко всем фиксированным смещениям всех разделов, при перебалансировке потребитель, которому присваивается новый раздел, просто считывает зафиксированное смещение этого раздела из темы __consumer_offsets и возобновляет работу, где остался старый потребитель.
Как совершать смещения
KafkaConsumers может автоматически выполнять смещения в фоновом режиме (параметр конфигурации enable.auto.commit = true ), что является настройкой по умолчанию. Эти автоматические фиксации выполняются в пределах poll() ( который обычно вызывается в цикле ). Как часто должны выполняться смещения, их можно настроить с помощью auto.commit.interval.ms . Поскольку автоматические коммиты встроены в poll() а poll() вызывается кодом пользователя, этот параметр определяет нижнюю границу интервала между фиксацией.
В качестве альтернативы автоматической фиксации, смещения также могут управляться вручную. Для этого автоматическая фиксация должна быть отключена ( enable.auto.commit = false ). Для ручной KafkaConsumers предлагает два метода: commitSync () и commitAsync () . Как указано в имени, commitSync() является блокирующим вызовом, который возвращает после успешного завершения смещений, а commitAsync() немедленно возвращается. Если вы хотите узнать, была ли фиксация успешной или нет, вы можете предоставить обработчик обратного вызова ( OffsetCommitCallback ) параметр метода. Обратите внимание, что в обоих вызовах фиксации потребитель совершает смещения последнего вызова poll() . Например. предположим, что одна тема раздела с одним потребителем и последний вызов poll() возвращают сообщения со смещениями 4,5,6. При фиксации будет выполнено смещение 6, потому что это последнее смещение, отслеживаемое потребительским клиентом. В то же время оба commitSync() и commitAsync() позволяют больше контролировать, какое смещение вы хотите зафиксировать: если вы используете соответствующие перегрузки, которые позволяют вам указать Map<TopicPartition, OffsetAndMetadata> потребитель будет совершать только указанные смещения (т. е. карта может содержать любое подмножество назначенных разделов, а указанное смещение может иметь любое значение).
Семантика совершенных смещений
Зафиксированное смещение указывает, что все сообщения до этого смещения уже обработаны. Таким образом, поскольку смещения являются последовательными числами, совершение смещения X неявно совершает все смещения, меньшие, чем X Следовательно, нет необходимости совершать каждое смещение отдельно и совершать сразу несколько смещений одновременно, но просто совершает наибольшее смещение.
Обратите внимание, что по дизайну также возможно совершить меньшее смещение, чем последнее зафиксированное смещение. Это можно сделать, если сообщения должны быть прочитаны во второй раз.
Гарантии обработки
Использование автоматической фиксации обеспечивает семантику обработки по меньшей мере один раз. Исходное предположение состоит в том, что poll() вызывается только после того, как все ранее доставленные сообщения обработаны успешно. Это гарантирует, что никакое сообщение не будет потеряно, потому что фиксация произойдет после обработки. Если потребитель совершает ошибку до фиксации, все сообщения после последнего фиксации принимаются от Kafka и обрабатываются снова. Однако повторная попытка может привести к дублированию, поскольку некоторое сообщение из последнего вызова poll() могло быть обработано, но сбой произошел непосредственно перед вызовом автоматической фиксации.
Если требуется семантика обработки как commitSync() один раз, автоматическая фиксация должна быть отключена, и необходимо вручную commitSync() непосредственно после poll() . После этого сообщения обрабатываются. Это гарантирует, что сообщения будут совершены до того, как они будут обработаны и, таким образом, никогда не будут прочитаны во второй раз. Конечно, некоторое сообщение может потеряться в случае сбоя.
Как я могу прочитать тему с ее начала?
Существует множество стратегий для чтения темы с самого начала. Чтобы объяснить это, нам сначала нужно понять, что происходит при запуске потребителя. При запуске потребителя происходит следующее:
- присоединиться к настроенной группе потребителей, которая запускает перебалансировку и назначает разделы для потребителя
- искать совершенные смещения (для всех разделов, которые были присвоены потребителю)
- для всех разделов с допустимым смещением, возобновить с этого смещения
- для всех разделов с недопустимым смещением установите начальное смещение в соответствии с параметром конфигурации
auto.offset.reset
Начать новую группу потребителей
Если вы хотите обработать тему с ее начала, вы можете просто начать новую группу потребителей (т. group.id Выбрать неиспользуемый group.id ) и установить auto.offset.reset = earliest . Поскольку для новой группы нет совершенных смещений, произойдет сброс автоматического смещения, и тема будет потребляться с самого начала. Обратите внимание, что при повторном перезагрузке пользователя, если вы снова используете тот же group.id , он не будет читать тему с начала, но возобновите ее, где она осталась. Таким образом, для этой стратегии вам нужно будет назначить новый group.id каждый раз, когда вы хотите прочитать тему с самого начала.
Повторное использование одного и того же идентификатора группы
Чтобы избежать установки новой group.id каждый раз, когда вы хотите прочитать тему с ее начала, вы можете отключить автоматическую фиксацию (через enable.auto.commit = false ) перед запуском потребителя в первый раз (используя неиспользуемую group.id и установка auto.offset.reset = earliest ). Кроме того, вы не должны выполнять какие-либо смещения вручную. Поскольку смещения никогда не выполняются с использованием этой стратегии, при перезапуске потребитель снова прочитает эту тему с самого начала.
Однако эта стратегия имеет два недостатка:
- он не является отказоустойчивым
- групповое перебалансирование не работает должным образом
(1) Поскольку смещения никогда не выполняются, сбой и остановленный потребитель обрабатываются одинаково при перезагрузке. В обоих случаях эта тема будет потребляться с самого начала. (2) Поскольку смещение никогда не выполняется, при перебалансировке вновь назначенные разделы будут потребителями с самого начала.
Поэтому эта стратегия работает только для групп потребителей с одним потребителем и должна использоваться только для целей развития.
Повторное использование одного и того же идентификатора группы и фиксации
Если вы хотите быть отказоустойчивым и / или использовать несколько потребителей в своей группе потребителей, обязательными являются смещения. Таким образом, если вы хотите прочитать тему с самого начала, вам нужно манипулировать совершенными смещениями при запуске пользователя. Для этого KafkaConsumer предоставляет три метода seek() , seekToBeginning() и seekToEnd() . Хотя seek() может использоваться для установки произвольного смещения, второй и третий методы могут использоваться для поиска в начале или конце раздела, соответственно. Таким образом, при сбое и при попытке перезагрузки потребителей будет исключен, и потребитель может вернуться туда, где он остался. Для consumer-stop-and-restart-from- seekToBeginning() будет вызываться явно до того, как вы войдете в цикл poll() . Обратите внимание, что seekXXX() может использоваться только после того, как потребитель присоединился к группе - таким образом, перед использованием seekXXX() требуется выполнить «фиктивный опрос». Общий код будет примерно таким:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}