apache-kafka
Grupy konsumentów i zarządzanie offsetami
Szukaj…
Parametry
| Parametr | Opis |
|---|---|
| Identyfikator grupy | Nazwa grupy konsumenckiej. |
| enable.auto.commit | Automatycznie zatwierdzaj przesunięcia; domyślnie: prawda . |
| auto.commit.interval.ms | Minimalne opóźnienie w milisekundach między zatwierdzeniami (wymaga enable.auto.commit=true ); domyślnie: 5000 . |
| auto.offset.reset | Co zrobić, gdy nie znaleziono ważnego zatwierdzonego przesunięcia; domyślnie: najnowsze . (+) |
| (+) Możliwe wartości | Opis |
| najwcześniej | Automatycznie resetuj przesunięcie do najwcześniejszego przesunięcia. |
| najnowszy | Automatycznie resetuj przesunięcie do najnowszego przesunięcia. |
| Żaden | Zgłaszaj wyjątek dla konsumenta, jeśli nie znaleziono wcześniejszego przesunięcia dla grupy konsumentów. |
| coś jeszcze | Rzuć wyjątek konsumentowi. |
Co to jest grupa konsumentów
Od wersji Kafka 0.9 dostępny jest nowy klient KafkaConsumer wysokiego poziomu. Wykorzystuje nowy wbudowany protokół Kafka, który pozwala łączyć wielu konsumentów w tzw. Grupę konsumencką . Grupę konsumentów można opisać jako pojedynczego logicznego konsumenta, który subskrybuje zestaw tematów. Podziały na wszystkie tematy są przypisywane fizycznym konsumentom w grupie, tak, że każda ścieżka przypisana jest jednemu jednemu konsumentowi (jednemu konsumentowi można przypisać wiele partycji). Poszczególni konsumenci należący do tej samej grupy mogą działać na różnych hostach w sposób rozproszony.
Grupy konsumentów są identyfikowane poprzez ich group.id . Aby przypisać konkretną instancję klienta do grupy konsumenckiej, wystarczy przypisać group.id do tego klienta za pomocą konfiguracji klienta:
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
Tak więc wszyscy konsumenci, którzy łączą się z tym samym klastrem Kafka i używają tej samej group.id Tworzą grupę konsumentów. Konsumenci mogą opuścić grupę w dowolnym momencie, a nowi klienci mogą dołączyć do grupy w dowolnym momencie. W obu przypadkach uruchamiane jest tak zwane ponowne równoważenie, a partycje są ponownie przypisywane do grupy konsumentów, aby zapewnić, że każda partycja jest przetwarzana przez jednego konsumenta w grupie.
Zwróć uwagę, że nawet pojedynczy KafkaConsumer tworzy Grupę Konsumentów, a sam jako pojedynczy członek.
Zarządzanie offsetami konsumenckimi i tolerancja na awarie
KafkaConsumers żąda wiadomości od brokera Kafka za pośrednictwem wywołania poll() a ich postępy są śledzone za pomocą przesunięć . Każda wiadomość w obrębie każdej partycji każdego tematu ma przypisane tak zwane przesunięcie - jego logiczny numer sekwencyjny w obrębie partycji. KafkaConsumer śledzi swoje bieżące przesunięcie dla każdej przypisanej do niego partycji. Zwróć uwagę, że brokerzy Kafka nie są świadomi obecnych przesunięć konsumentów. Zatem w poll() konsument musi wysłać swoje bieżące przesunięcia do brokera, tak aby broker mógł zwrócić odpowiednie wiadomości, tj. wiadomości z większym kolejnym przesunięciem. Załóżmy na przykład, że mamy jeden temat partycji i jednego konsumenta z bieżącym przesunięciem 5. W momencie poll() konsument wysyła, jeśli przesunięcie do brokera i broker zwraca komunikaty o przesunięciach 6,7,8, ...
Ponieważ konsumenci sami śledzą swoje przesunięcia, informacje te mogą zostać utracone w przypadku awarii konsumenta. Zatem przesunięcia muszą być przechowywane w sposób niezawodny, tak aby przy ponownym uruchomieniu konsument mógł odebrać swoje stare przesunięcie i wznowić od miejsca, w którym opuścił. W Kafce jest wbudowane wsparcie dla tego poprzez przesunięcia offsetów . Nowy KafkaConsumer może zatwierdzić swoje bieżące przesunięcie do Kafki, a Kafka przechowuje te przesunięcia w specjalnym temacie o nazwie __consumer_offsets . Przechowywanie przesunięć w temacie Kafka jest nie tylko odporne na awarie, ale umożliwia także ponowne przypisanie partycji do innych konsumentów podczas ponownego równoważenia. Ponieważ wszyscy konsumenci grupy konsumenckiej mogą uzyskać dostęp do wszystkich zatwierdzonych przesunięć wszystkich partycji, po ponownym zrównoważeniu konsument, który otrzymuje nową partycję, po prostu odczytuje zatwierdzone przesunięcie tej partycji z tematu __consumer_offsets i wznawia, gdzie poprzedni konsument opuścił.
Jak zatwierdzać przesunięcia
KafkaConsumers może automatycznie dokonywać przesunięć w tle (parametr konfiguracyjny enable.auto.commit = true ), co jest ustawieniem domyślnym. Te automatyczne zatwierdzenia są wykonywane w poll() ( które zwykle jest wywoływane w pętli ). auto.commit.interval.ms przesunięć może być konfigurowana przez auto.commit.interval.ms . Ponieważ automatyczne zatwierdzenia są osadzone w poll() a poll() jest wywoływany przez kod użytkownika, ten parametr określa dolną granicę dla interwału między zatwierdzeniami.
Jako alternatywę dla automatycznego zatwierdzania przesunięciami można również zarządzać ręcznie. W tym celu automatyczne zatwierdzanie powinno być wyłączone ( enable.auto.commit = false ). Do ręcznego KafkaConsumers oferuje dwie metody, mianowicie commitSync () i commitAsync () . Jak sama nazwa wskazuje, commitSync() to wywołanie blokujące, które powraca po pomyślnym commitAsync() przesunięć, podczas gdy commitAsync() zwraca natychmiast. Jeśli chcesz wiedzieć, czy zatwierdzenie zakończyło się pomyślnie, możesz podać OffsetCommitCallback obsługi wywołania OffsetCommitCallback ( OffsetCommitCallback ) parametr metody. Zwróć uwagę, że w obu wywołaniach zatwierdzenia konsument popełnia przesunięcia ostatniego wywołania poll() . Na przykład. załóżmy, że temat dotyczy jednej partycji z jednym konsumentem, a ostatnie wywołanie poll() zwraca wiadomości z przesunięciami 4,5,6. Przy zatwierdzeniu przesunięcie 6 zostanie zatwierdzone, ponieważ jest to najnowsze przesunięcie śledzone przez klienta klienta. Jednocześnie zarówno commitSync() i commitAsync() pozwalają na większą kontrolę nad tym, jakie przesunięcie chcesz zatwierdzić: jeśli użyjesz odpowiednich przeciążeń, które pozwalają określić Map<TopicPartition, OffsetAndMetadata> konsument popełni tylko określone przesunięcia (tzn. mapa może zawierać dowolny podzbiór przypisanych partycji, a określone przesunięcie może mieć dowolną wartość).
Semantyka popełnionych przesunięć
Zatwierdzone przesunięcie wskazuje, że wszystkie wiadomości do tego przesunięcia zostały już przetworzone. Zatem, ponieważ przesunięcia są kolejnymi liczbami, zatwierdzenie przesunięcia X domyślnie zatwierdza wszystkie przesunięcia mniejsze niż X Dlatego nie jest konieczne indywidualne zatwierdzanie każdego przesunięcia, a wielokrotne przesunięcia naraz zdarzają się, ale tylko największe przesunięcie.
Zwróć uwagę, że zgodnie z projektem możliwe jest również zatwierdzenie mniejszego przesunięcia niż ostatnie zatwierdzone przesunięcie. Można to zrobić, jeśli wiadomości będą czytane po raz drugi.
Gwarancje przetwarzania
Korzystanie z automatycznego zatwierdzania zapewnia semantykę przetwarzania co najmniej raz. Podstawowym założeniem jest to, że poll() jest wywoływana dopiero po pomyślnym przetworzeniu wszystkich wcześniej dostarczonych wiadomości. Zapewnia to, że żaden komunikat nie zostanie utracony, ponieważ zatwierdzenie następuje po przetworzeniu. Jeśli konsument zawiedzie przed zatwierdzeniem, wszystkie wiadomości po ostatnim zatwierdzeniu są odbierane od Kafki i przetwarzane ponownie. Ponowna próba może jednak spowodować duplikaty, ponieważ niektóre wiadomości z ostatniego wywołania poll() mogły zostać przetworzone, ale błąd wystąpił tuż przed wywołaniem automatycznego zatwierdzenia.
Jeśli wymagana jest semantyka przetwarzania co najwyżej raz, automatyczne zatwierdzanie musi być wyłączone i należy wykonać ręczne commitSync() bezpośrednio po poll() . Następnie wiadomości są przetwarzane. Zapewnia to, że wiadomości są zatwierdzane przed ich przetworzeniem, a zatem nigdy nie będą czytane po raz drugi. Oczywiście niektóre wiadomości mogą zostać utracone w przypadku awarii.
Jak mogę przeczytać temat od samego początku
Istnieje wiele strategii czytania tematu od samego początku. Aby to wyjaśnić, najpierw musimy zrozumieć, co dzieje się podczas uruchamiania klienta. Po uruchomieniu klienta następują:
- dołącz do skonfigurowanej grupy konsumentów, która wyzwala ponowne równoważenie i przypisuje partycje do konsumenta
- poszukaj popełnionych przesunięć (dla wszystkich partycji, które zostały przypisane do konsumenta)
- dla wszystkich partycji z prawidłowym przesunięciem, wznów od tego przesunięcia
- dla wszystkich partycji z niepoprawnym przesunięciem ustaw przesunięcie początkowe zgodnie z parametrem konfiguracyjnym
auto.offset.reset
Załóż nową grupę konsumentów
Jeśli chcesz przetworzyć temat od samego początku, możesz po prostu założyć nową grupę konsumentów (tj. Wybrać nieużywany group.id ) i ustawić auto.offset.reset = earliest . Ponieważ nie ma zatwierdzonych przesunięć dla nowej grupy, nastąpi automatyczne resetowanie przesunięcia i temat zostanie zużyty od samego początku. Zwróć uwagę, że przy ponownym uruchomieniu konsumenta, jeśli ponownie użyjesz tej samej grupy. group.id , nie przeczyta tematu od początku, ale wznowi tam, gdzie go opuścił. Dlatego do tej strategii musisz przypisać nową group.id każdym razem, gdy chcesz przeczytać temat od samego początku.
Ponownie użyj tego samego identyfikatora grupy
Aby uniknąć ustawiania nowego group.id każdym razem, gdy chcesz przeczytać temat od samego początku, możesz wyłączyć automatyczne zatwierdzanie (przez enable.auto.commit = false ) przed uruchomieniem konsumenta po raz pierwszy (przy użyciu nieużywanej group.id i ustawienie auto.offset.reset = earliest ). Ponadto nie należy ręcznie dokonywać żadnych przesunięć. Ponieważ przesunięcia nigdy nie są popełniane przy użyciu tej strategii, po ponownym uruchomieniu konsument będzie czytał ten temat od początku.
Jednak ta strategia ma dwie wady:
- nie jest odporny na uszkodzenia
- przywracanie równowagi grupy nie działa zgodnie z przeznaczeniem
(1) Ponieważ przesunięcia nigdy nie są popełniane, niesprawny i zatrzymany konsument jest obsługiwany w ten sam sposób przy ponownym uruchomieniu. W obu przypadkach temat zostanie zużyty od samego początku. (2) Ponieważ przesunięcie nigdy nie jest zatwierdzane, przy ponownym równoważeniu nowo przypisane partycje będą od samego początku konsumentami.
Dlatego ta strategia działa tylko w przypadku grup konsumentów z jednym konsumentem i powinna być wykorzystywana wyłącznie do celów programistycznych.
Ponownie użyj tego samego identyfikatora grupy i zatwierdzenia
Jeśli chcesz być odporny na awarie i / lub korzystać z wielu konsumentów w swojej grupie konsumentów, dopuszczenie się kompensacji jest obowiązkowe. Dlatego jeśli chcesz przeczytać temat od samego początku, musisz manipulować zatwierdzonymi przesunięciami podczas uruchamiania klienta. W tym celu KafkaConsumer udostępnia trzy metody seek() , seekToBeginning() i seekToEnd() . Podczas gdy seek() może być użyte do ustawienia dowolnego przesunięcia, druga i trzecia metoda mogą być użyte do wyszukiwania odpowiednio do początku lub końca partycji. Dlatego w przypadku niepowodzenia i ponownego uruchomienia konsumenta wyszukiwanie zostanie pominięte, a konsument może wznowić od miejsca, w którym opuścił. Dla konsumenta stop-and-restartuj-od-początku, seekToBeginning() zostanie wywołane jawnie przed wejściem w pętlę poll() . Zauważ, że seekXXX() może być użyte tylko wtedy, gdy konsument dołączył do grupy - dlatego przed użyciem seekXXX() wymagane jest przeprowadzenie „fikcyjnego sondażu”. Ogólny kod wyglądałby mniej więcej tak:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}