Compensación de Kafka Broker / retención de registros y restablecimiento de compensación de consumidores en modo más temprano

Descripción del problema:

Nuestro consumidor de Kafka (desarrollado en Spring Boot 2.x) se está ejecutando durante varios días. Cuando reiniciamos esos consumidores, todos los mensajes del tema se vuelven a consumir, pero solo en condiciones específicas.

Condiciones:

Nos supose que el corredor combinación / config tema ( log.retention. * , Offsets.retention. * ) Y la configuración de los consumidores ( auto.offset.reset = más temprano ) están causando este comportamiento.
Obviamente no podemos configurar al consumidor como "último" , porque si el consumidor se detiene y llegan nuevos mensajes, cuando el consumidor comience nuevamente, esos mensajes no se consumirán.

Pregunta:

¿Cuál es la configuración correcta para evitar esta situación?
En la última versión de Kafka Broker (2.x), los valores predeterminados para log.retention. * Y offsets.retention. * Son los mismos ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A + Incremento + compensaciones + retención + predeterminado + a + 7 + días )

¿Podría esta nueva configuración resolver el problema?

Configuración del consumidor ( auto.commit delegado en Spring Cloud Stream Framework):

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Configuración de corredores:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

Gracias y saludos

Respuesta 1

Tiene razón, está experimentando este problema debido a los diferentes valores para log.retention.*offsets.retention.*(7 días y 1 día respectivamente) para las versiones de Kafka anteriores a la 2.0, consulte la descripción aquí. se debe a mensajes raros que entran en su tema y a datos de compensación ya caducados.

no es totalmente correcto con respecto a tu frase Obviously we can't set consumer to "latest". si recibió los últimos mensajes menos de 1 día antes (como unas pocas horas antes), puede actualizar el auto.offset.resetvalor de forma segura latesty con el mismo ID de grupo (o application.id). en tal caso no perderá mensajes.

Como otra opción, puede cambiar el valor de retención de registro para un tema específico a 1 día. También podría actualizar el valor offsets.retention.*, pero con eso necesita probarlo desde un punto de rendimiento suyo, podría degradarse.

Respuesta: 2

Reconozco que hay una pregunta similar aquí, de un compañero que quería dividir un solo archivo en varios archivos. Lamentablemente, sin embargo, dado que hay una cierta cantidad de sobrecarga asociada con ...

He desarrollado un programa Java que contará la cantidad de archivos en la carpeta. Puede haber archivos Java o archivos de texto, de los cuales contará la cantidad de líneas de código. La idea es imprimir el ...

A continuación se muestra el código. Ahora ya está atrapada la excepción. ¿Cómo puedo escribir la prueba negativa para el mismo? ¿Para asegurarse de que el código ingrese en el bloque catch? Carga pública de ThirdPartyResponse (...

Estoy creando una EmployeeStore que almacenará nombres, nombres de dominio, identificación, dirección de correo electrónico, etc. y necesito escribir un método de edición. He buscado en Google y no puedo encontrar cómo hacerlo, ¿alguien puede ayudarme? Aquí está mi código: ...