Flink pierde líder y se estrella

Estoy ejecutando una aplicación de procesamiento de transmisión en un entorno LocalStream (clúster de parpadeo integrado). Procesé un conjunto de datos específico usando mi código con éxito varias veces. Quería volver a ejecutar la aplicación ayer después de hacer algunas modificaciones en la lógica de procesamiento, sin embargo, después de aproximadamente 3/4 de camino a través del procesamiento de datos, parece que el clúster de parpadeo se bloquea sin ningún motivo. Revise el registro condensado: mis comentarios se insertan entre corchetes <>:

2018-02-09 12:04:05,146 [INFO] from a.b.l.f.MultiS3FileSource in Source: General source (1/1) - inserting 266574 events
2018-02-09 12:10:55,094 [ERROR] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11020 - class org.apache.flink.runtime.client.JobSubmissionClientActor received unknown message: 
2018-02-09 12:10:55,245 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Process -> Detection(7/8) switched to CANCELED ) because there is currently no valid leader id known.
2018-02-09 12:10:55,268 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Enrichment-> Flat Map(7/8) switched to CANCELED ) because there is currently no valid leader id known.
... <similar messages for all the processing steps>
2018-02-09 12:10:55,509 [ERROR] from o.a.f.s.r.t.StreamTask in PartialAggregations-> Sink: CassandraSink (1/8) - Error during disposal of stream operator.
java.lang.InterruptedException: null <because its interrupting a future>
... <for all of my sinks - these are custom, not the flink cassandra connectors>

El primer mensaje de información es sobre mis fuentes leyendo datos de s3 y recopilándolos en flink.

Luego, el primer error es producido por: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java#L137

y las advertencias son producidas por: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java#L115

El último error está en mi código, pero es causado por un parpadeo que intenta derribar el trabajo, por lo que no debería ser la causa original del error.

Puedo proporcionar información adicional, pero no estoy seguro de qué sería relevante.

El primer error parece ser lo que conecta todo el accidente. ¿Cómo sería posible que JobSubmissionClientActor tuviera un getLeaderSessionID nulo? ¿Qué tipo de mensajes espera JobSubmissionClientActor si flink se está ejecutando incrustado? Me parece que todos los mensajes que puede recibir son mensajes relacionados con el envío de trabajos. ¿Debería eso ser posible en modo incrustado? ¿Cómo puedo prevenir este bloqueo?

ACTUALIZACIÓN: Creo que he malinterpretado el registro de errores. Cuando ejecuté la ejecución una vez más, obtuve un orden de eventos ligeramente diferente. En la ejecución anterior, solo obtuve errores relacionados con la eliminación de la transmisión, sin una causa obvia para que finalice la transmisión, ya que el último error parece no estar incluido en mi archivo de registro (aunque se imprimió en stdout). Este error está debajo, los errores anteriores son similares a los de la ejecución anterior (errores relacionados con la eliminación de la transmisión).

[error] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: JobClientActor seems to have died before the JobExecutionResult could be retrieved.[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:285)
[error]         at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
[error]         at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
[error]         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:108)
[error]         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
[error]         at a.b.l.flink.FlinkIngestPrototype$.run(FlinkIngestPrototype.scala:90)
[error]         at a.b.l.flink.FlinkIngestPrototype$.main(FlinkIngestPrototype.scala:43)
[error]         at a.b.l.flink.FlinkIngestPrototype.main(FlinkIngestPrototype.scala)
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
[error]         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
[error]         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
[error]         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[error]         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error]         at scala.concurrent.Await$.result(package.scala:190)
[error]         at scala.concurrent.Await.result(package.scala)
[error]         at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:273)
[error]         ... 9 more

He rastreado el error de ejecución a lo siguiente:

  1. El objeto JobClient hace sonar al actor cliente del trabajo si ya está hecho y si no es así, simplemente lo hace sonar si está vivo. El ping vivo es: https://github.com/apache/flink/blob/62a777bc8ddfb4e34d7beaf7091a90b0bcc70c51/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java#L273

  2. Este ping expira y envía una píldora de veneno al actor de trabajo y eso da como resultado todos los diferentes errores de eliminación.

Me he encontrado con algunos problemas con futuros antes de que se interrumpieran con tiempos de espera más cortos de una manera no determinista. He depurado el problema un poco y creo que fue debido a algunas pausas muy largas de GC (o algo similar). Una ilustración de cómo se sincronizan los tiempos de espera con las pausas del GC: https://imgur.com/a/9mMvN . Creo que esta podría ser la causa de este tiempo de espera también. Esta es mi configuración de GC:

"-XX:-UseParallelGC",
"-XX:-UseConcMarkSweepGC",
"-XX:+UseG1GC",

que, según la mayoría de las fuentes, debería dar lugar a pausas muy breves de GC (menos de un segundo). ¿Alguien tiene alguna experiencia con pausas de GC muy largas en Flink? ¿Podría ser esto un problema relacionado de alguna manera con HW? Estoy ejecutando la aplicación en una instancia de EC2 AWS.

Respuesta 1

Cuando realiza una solicitud de descanso con RestAssured, parece esperar una respuesta. Necesito hacer una solicitud POST en RestAssured y luego, mientras espera una respuesta, necesito hacer una solicitud GET. YO'...

Por ejemplo, para un juego tengo algo de habilidad, que es un objeto de datos: habilidad de interfaz pública {public String getName (); } Public class Attack implementa Skill {public String getName () {return "Attack"; ...

Spring Boot puede construir archivos ejecutables .jar o .war. ¿Hay alguna diferencia entre ellos, además de la extensión del archivo? La documentación indica que debe configurar su proyecto para construir un jar o ...

¿Existe una manera elegante de afirmar que los números son iguales mientras se ignoran sus clases? Quiero usarlo en el marco de pruebas JUnit pero, por ejemplo, Assert.assertEquals (1,1L) falla con java.lang ...