Descripción del problema:
Nuestro consumidor de Kafka (desarrollado en Spring Boot 2.x) se está ejecutando durante varios días. Cuando reiniciamos esos consumidores, todos los mensajes del tema se vuelven a consumir, pero solo en condiciones específicas.
Condiciones:
Nos supose que el corredor combinación / config tema ( log.retention. * , Offsets.retention. * ) Y la configuración de los consumidores ( auto.offset.reset = más temprano ) están causando este comportamiento.
Obviamente no podemos configurar al consumidor como "último" , porque si el consumidor se detiene y llegan nuevos mensajes, cuando el consumidor comience nuevamente, esos mensajes no se consumirán.
Pregunta:
¿Cuál es la configuración correcta para evitar esta situación?
En la última versión de Kafka Broker (2.x), los valores predeterminados para log.retention. * Y offsets.retention. * Son los mismos ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A + Incremento + compensaciones + retención + predeterminado + a + 7 + días )
¿Podría esta nueva configuración resolver el problema?
Configuración del consumidor ( auto.commit delegado en Spring Cloud Stream Framework):
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [server1:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer_group1
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Configuración de corredores:
log.retention.ms = 86400000
log.retention.minutes = 10080
log.retention.hours = 168
log.retention.bytes = -1
offsets.retention.ms = 864000000
offsets.retention.minutes = 14400
offsets.retention.hours = 240
unclean.leader.election.enable = false
log.cleaner.enable = true
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.retention.check.interval.ms = 300000
log.cleaner.delete.retention.ms = 604800000
Gracias y saludos
Tiene razón, está experimentando este problema debido a los diferentes valores para log.retention.*
y offsets.retention.*
(7 días y 1 día respectivamente) para las versiones de Kafka anteriores a la 2.0, consulte la descripción aquí. se debe a mensajes raros que entran en su tema y a datos de compensación ya caducados.
no es totalmente correcto con respecto a tu frase Obviously we can't set consumer to "latest"
. si recibió los últimos mensajes menos de 1 día antes (como unas pocas horas antes), puede actualizar el auto.offset.reset
valor de forma segura latest
y con el mismo ID de grupo (o application.id
). en tal caso no perderá mensajes.
Como otra opción, puede cambiar el valor de retención de registro para un tema específico a 1 día. También podría actualizar el valor offsets.retention.*
, pero con eso necesita probarlo desde un punto de rendimiento suyo, podría degradarse.
Reconozco que hay una pregunta similar aquí, de un compañero que quería dividir un solo archivo en varios archivos. Lamentablemente, sin embargo, dado que hay una cierta cantidad de sobrecarga asociada con ...
He desarrollado un programa Java que contará la cantidad de archivos en la carpeta. Puede haber archivos Java o archivos de texto, de los cuales contará la cantidad de líneas de código. La idea es imprimir el ...
A continuación se muestra el código. Ahora ya está atrapada la excepción. ¿Cómo puedo escribir la prueba negativa para el mismo? ¿Para asegurarse de que el código ingrese en el bloque catch? Carga pública de ThirdPartyResponse (...
Estoy creando una EmployeeStore que almacenará nombres, nombres de dominio, identificación, dirección de correo electrónico, etc. y necesito escribir un método de edición. He buscado en Google y no puedo encontrar cómo hacerlo, ¿alguien puede ayudarme? Aquí está mi código: ...