apache-kafka
Grupos de consumidores y gestión de compensaciones
Buscar..
Parámetros
| Parámetro | Descripción |
|---|---|
| Identificación del grupo | El nombre del Grupo de Consumidores. |
| enable.auto.commit | Confirmar automáticamente las compensaciones; por defecto: verdadero |
| auto.commit.interval.ms | El retraso mínimo en milisegundos entre a las confirmaciones (requiere enable.auto.commit=true ); por defecto: 5000 . |
| auto.offset.reset | Qué hacer cuando no se encuentra un desplazamiento confirmado válido; por defecto: la última . (+) |
| (+) Valores posibles | Descripción |
| más temprano | Restablecer automáticamente el desplazamiento al primer desplazamiento. |
| último | Restablecer automáticamente el desplazamiento a la última compensación. |
| ninguna | Lanzar excepción al consumidor si no se encuentra un desplazamiento anterior para el grupo de consumidores. |
| Algo más | Lanzar excepción al consumidor. |
¿Qué es un grupo de consumidores?
A partir de Kafka 0.9, el nuevo cliente de alto nivel KafkaConsumer está disponible. Explota un nuevo protocolo Kafka incorporado que permite combinar múltiples consumidores en un llamado Grupo de Consumidores . Un grupo de consumidores puede describirse como un único consumidor lógico que se suscribe a un conjunto de temas. Las partes de todos los temas se asignan a los consumidores físicos dentro del grupo, de modo que cada una de ellas se asigna a un solo consumidor (un solo consumidor puede tener varias particiones asignadas). Los consumidores individuales que pertenecen al mismo grupo pueden ejecutar en diferentes hosts de manera distribuida.
Los grupos de consumidores se identifican a través de su group.id . Para hacer que una instancia de cliente específica sea miembro de un grupo de consumidores, es suficiente asignar los grupos group.id a este cliente, a través de la configuración del cliente:
Properties props = new Properties();
props.put("group.id", "groupName");
// ...some more properties required
new KafkaConsumer<K, V>(config);
Por lo tanto, todos los consumidores que se conectan al mismo clúster Kafka y usan el mismo group.id forman un grupo de consumidores. Los consumidores pueden dejar un grupo en cualquier momento y los nuevos consumidores pueden unirse a un grupo en cualquier momento. En ambos casos, se activa un llamado rebalanceo y las particiones se reasignan con el Grupo de consumidores para garantizar que cada partición sea procesada por un consumidor dentro del grupo.
Preste atención, que incluso un solo KafkaConsumer forma un Grupo de Consumidores consigo mismo como miembro único.
Gestión de la compensación del consumidor y tolerancia a fallos
KafkaConsumers solicita mensajes a un agente de Kafka a través de una llamada a poll() y su progreso se rastrea a través de compensaciones . Cada mensaje dentro de cada partición de cada tema tiene un llamado desplazamiento asignado: su número de secuencia lógica dentro de la partición. Un KafkaConsumer rastrea su compensación actual para cada partición que se le asigna. Preste atención, que los corredores de Kafka no están al tanto de las compensaciones actuales de los consumidores. Por lo tanto, en la poll() el consumidor debe enviar sus compensaciones actuales al intermediario, de modo que el intermediario pueda devolver los mensajes correspondientes, es decir ,. Mensajes con mayor desplazamiento consecutivo. Por ejemplo, supongamos que tenemos un solo tema de partición y un solo consumidor con la compensación actual 5. En la poll() el consumidor envía una compensación al agente y los mensajes de devolución del agente para las compensaciones 6,7,8, ...
Debido a que los consumidores rastrean sus propias compensaciones, esta información podría perderse si un consumidor falla. Por lo tanto, las compensaciones deben almacenarse de manera confiable, de modo que al reiniciar, un consumidor puede recoger su compensación anterior y volver a clasificarla donde la dejó. En Kafka, hay soporte incorporado para esto a través de confirmaciones de compensación . El nuevo KafkaConsumer puede comprometer su compensación actual a Kafka y Kafka almacena esas compensaciones en un tema especial llamado __consumer_offsets . El almacenamiento de las compensaciones dentro de un tema Kafka no solo es tolerante a fallos, sino que también permite reasignar particiones a otros consumidores durante un reequilibrio. Debido a que todos los consumidores de un Grupo de consumidores pueden acceder a todas las compensaciones confirmadas de todas las particiones, en el rebalanceo, un consumidor que obtiene una nueva partición asignada simplemente lee el desplazamiento confirmado de esta partición del tema __consumer_offsets y reanuda el lugar donde quedó el antiguo consumidor.
Cómo cometer compensaciones
KafkaConsumers puede asignar compensaciones automáticamente en segundo plano (parámetro de configuración enable.auto.commit = true ), cuál es la configuración predeterminada. Estas confirmaciones automáticas se realizan dentro de poll() ( que normalmente se llama en un bucle ). La frecuencia con la que se deben confirmar las compensaciones, se puede configurar a través de auto.commit.interval.ms . Debido a que las confirmaciones automáticas están integradas en poll() y el código de usuario llama a poll() , este parámetro define un límite inferior para el intervalo entre confirmaciones.
Como alternativa a la confirmación automática, las compensaciones también se pueden gestionar manualmente. Para esto, la confirmación automática debe estar deshabilitada ( enable.auto.commit = false ). Para la KafkaConsumers manual, KafkaConsumers ofrece dos métodos, a saber, commitSync () y commitAsync () . Como su nombre lo indica, commitSync() es una llamada de bloqueo, que se devuelve después de que las compensaciones se confirmaron correctamente, mientras que commitAsync() devuelve inmediatamente. Si desea saber si una confirmación fue exitosa o no, puede proporcionar un controlador de devolución de llamada ( OffsetCommitCallback ) un parámetro de método. Preste atención, que en ambas llamadas de confirmación, el consumidor realiza las compensaciones de la última llamada a poll() . Por ejemplo. Asumamos un solo tema de partición con un solo consumidor y la última llamada a poll() devuelve mensajes con compensaciones 4,5,6. En la confirmación, la compensación 6 se confirmará porque esta es la última compensación seguida por el cliente consumidor. Al mismo tiempo, tanto commitSync() como commitAsync() permiten tener más control sobre qué compensación desea comprometer: si usa las sobrecargas correspondientes que le permiten especificar un Map<TopicPartition, OffsetAndMetadata> el consumidor solo confirmará las compensaciones especificadas (es decir, el mapa puede contener cualquier subconjunto de particiones asignadas, y el desplazamiento especificado puede tener cualquier valor).
Semántica de compensaciones comprometidas.
Un desplazamiento confirmado indica que todos los mensajes hasta este desplazamiento ya se han procesado. Por lo tanto, como las compensaciones son números consecutivos, la compensación de X compromete implícitamente todas las compensaciones más pequeñas que X Por lo tanto, no es necesario comprometer cada desplazamiento de forma individual y, al mismo tiempo, se cometen varias compensaciones a la vez, solo se realiza la compensación más grande.
Tenga en cuenta que, por diseño, también es posible comprometer una compensación menor que la última compensación confirmada. Esto se puede hacer, si los mensajes deben leerse por segunda vez.
Procesamiento de garantías
El uso de la confirmación automática proporciona al menos una vez la semántica de procesamiento. El supuesto subyacente es que solo se llama a poll() después de que todos los mensajes entregados previamente se procesaron correctamente. Esto garantiza que no se pierda ningún mensaje porque se produce una confirmación después del procesamiento. Si un consumidor falla antes de una confirmación, todos los mensajes después de la última confirmación se reciben de Kafka y se procesan nuevamente. Sin embargo, este reintento puede dar como resultado duplicados, ya que algunos mensajes de la última llamada a poll() pueden haberse procesado, pero el error ocurrió justo antes de la llamada de confirmación automática.
Si se requiere semántica de procesamiento a lo sumo una vez, se debe deshabilitar la confirmación automática y se debe realizar un commitSync() manual directamente después de la poll() . Después, los mensajes se procesan. Esto garantiza que los mensajes se confirmen antes de que se procesen y, por lo tanto, nunca se lean una segunda vez. Por supuesto, algún mensaje podría perderse en caso de fallo.
¿Cómo puedo leer el tema desde su principio?
Existen múltiples estrategias para leer un tema desde su inicio. Para explicarlos, primero debemos entender qué sucede en el inicio del consumidor. Al inicio de un consumidor, sucede lo siguiente:
- unirse al grupo de consumidores configurado, que activa un rebalanceo y asigna particiones al consumidor
- buscar compensaciones comprometidas (para todas las particiones que se asignaron al consumidor)
- para todas las particiones con compensación válida, reanudar desde esta compensación
- para todas las particiones con compensación no válida, configure la compensación de inicio de acuerdo con el parámetro de configuración
auto.offset.reset
Iniciar un nuevo grupo de consumidores
Si desea procesar un tema desde su inicio, puede iniciar un nuevo grupo de consumidores (es decir, elegir un group.id no group.id ) y establecer auto.offset.reset = earliest . Debido a que no hay compensaciones confirmadas para un nuevo grupo, el restablecimiento de la compensación automática se activará y el tema se consumirá desde el principio. Preste atención, que en el reinicio del consumidor, si usa el mismo group.id nuevamente, no volverá a leer el tema desde el principio, sino que continuará donde lo dejó. Por lo tanto, para esta estrategia, deberá asignar un nuevo group.id cada vez que desee leer un tema desde el principio.
Reutilizar el mismo ID de grupo
Para evitar configurar un nuevo group.id cada vez que quiera leer un tema desde su inicio, puede desactivar el compromiso automático (a través de enable.auto.commit = false ) antes de iniciar el consumidor por primera vez (utilizando un group.id no utilizado) group.id y configuración auto.offset.reset = earliest ). Además, no debe realizar ninguna compensación manualmente. Debido a que las compensaciones nunca se comprometen con esta estrategia, al reiniciar, el consumidor leerá el tema desde el principio nuevamente.
Sin embargo, esta estrategia tiene dos desventajas:
- no es tolerante a fallas
- reequilibrio de grupo no funciona como se esperaba
(1) Debido a que las compensaciones nunca se comprometen, un consumidor fallido y detenido se manejan de la misma manera en el reinicio. Para ambos casos, el tema será consumido desde su inicio. (2) Debido a que los desplazamientos nunca se comprometen, al rebalancear las particiones recién asignadas serán consumidores desde el principio.
Por lo tanto, esta estrategia solo funciona para grupos de consumidores con un solo consumidor y solo debe utilizarse para fines de desarrollo.
Reutilice la misma ID de grupo y confirme
Si desea ser tolerante a fallos y / o utilizar varios consumidores en su Grupo de consumidores, es obligatorio realizar compensaciones. Por lo tanto, si desea leer un tema desde el principio, debe manipular las compensaciones confirmadas en el inicio del consumidor. Para esto, KafkaConsumer proporciona tres métodos seek() , seekToBeginning() y seekToEnd() . Mientras que seek() se puede usar para establecer un desplazamiento arbitrario, el segundo y tercer método se pueden usar para buscar el principio o el final de una partición, respectivamente. Por lo tanto, en caso de fallo y en la búsqueda de reinicio del consumidor se omitiría y el consumidor puede reanudar donde lo dejó. Para consumidor-stop-and-restart-from- seekToBeginning() , se seekToBeginning() explícitamente antes de ingresar a su bucle de poll() . Tenga en cuenta que seekXXX() solo se puede usar después de que un consumidor se unió a un grupo; por lo tanto, se requiere que realice una "encuesta ficticia" antes de usar seekXXX() . El código general sería algo como esto:
if (consumer-stop-and-restart-from-beginning) {
consumer.poll(0); // dummy poll() to join consumer group
consumer.seekToBeginning(...);
}
// now you can start your poll() loop
while (isRunning) {
for (ConsumerRecord record : consumer.poll(0)) {
// process a record
}
}