Integración de Spring: vuelva a intentar la configuración con instancias múltiples

Estoy ejecutando 4 instancias de aplicaciones basadas en Spring Boot Integration en 4 servidores diferentes. El proceso es:

  1. Lea los archivos XML uno por uno en una carpeta compartida.
  2. Procese el archivo (verifique la estructura, el contenido ...), transforme los datos y envíe un correo electrónico.
  3. Escriba un informe sobre este archivo en otra carpeta compartida.
  4. Eliminar el archivo procesado con éxito.

Estoy buscando una solución segura y sin bloqueo para procesar estos archivos.

Casos de uso:

  • Si una instancia falla mientras lee o procesa un archivo (sin terminar la cadena de integración): otra instancia debe procesar el archivo o la misma instancia debe procesar el archivo después de que se reinicie.
  • Si una instancia está procesando un archivo, las otras instancias no deben procesar el archivo.

He creado este archivo de configuración XML de Spring Integration (incluye metadatostore JDBC con una base de datos H2 compartida):

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
    <int:queue/>
</int:channel>

<!-- Input -->
<int-file:inbound-channel-adapter 
    id="inputFilesAdapter"
    channel="inputFilesChannel"
    directory="file:${input.files.path}" 
    ignore-hidden="true"
    comparator="lastModifiedFileComparator"
    filter="compositeFilter">
   <int:poller fixed-rate="10000" max-messages-per-poll="1"  task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="1"/>

<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
    <property name="driverClassName" value="org.h2.Driver"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxIdle" value="4"/>
</bean>

<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
    <constructor-arg ref="jdbcDataSource"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="jdbcDataSource"/>
</bean>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                <constructor-arg index="0" ref="jdbcMetadataStore"/>
                <constructor-arg index="1" value="files"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
    <int:service-activator ref="fileActivator" method="fileRead"/>
    <int:service-activator ref="fileActivator" method="fileProcess"/>
    <int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>

<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

<int-file:outbound-channel-adapter 
    id="outputFilesChannel" 
    directory="file:${output.files.path}"
    filename-generator-expression ="payload.name">
    <int-file:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
             <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
        </bean>
    </int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>

</beans>

Problema: con varios archivos, cuando 1 archivo se procesa con éxito, la transacción confirma los otros archivos existentes en el metadatastore (tabla INT_METADATA_STORE). Entonces, si la aplicación se reinicia, los otros archivos nunca se procesarán (funciona bien si la aplicación se bloquea cuando se procesa el primer archivo). Parece que solo se aplica para leer archivos, no para procesar archivos en una cadena de integración ... ¿Cómo gestionar la transacción de reversión en JVM crash archivo por archivo?

Se agradece mucho cualquier ayuda. Me va a volver loco :(

Gracias !

Ediciones / Notas:

  • Inspirado en https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml

  • He actualizado mi configuración con la respuesta de Artem Bilan. Y elimine el transactionalbloque en el pollerbloque: tuve un conflicto de transacciones entre instancias (excepciones de bloqueos de tabla feos). Aunque el comportamiento fue el mismo.

  • He probado sin éxito esta configuración en el pollerbloque (mismo comportamiento):

    <int:advice-chain>
        <tx:advice id="txAdvice" transaction-manager="transactionManager">
            <tx:attributes>
                <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
    
  • Tal vez una solución basada en el patrón de integración empresarial del receptor idempotente podría funcionar. Pero no logré configurarlo ... No encuentro documentación precisa.

Respuesta 1

No deberías usar a PseudoTransactionManager, sino en su DataSourceTransactionManagerlugar.

Como utiliza a JdbcMetadataStore, participará en la transacción y si falla el flujo descendente, la entrada en el almacén de metadatos también se revertirá.

Respuesta: 2

Estoy tratando de clonar una lista en una nueva lista y establecer una propiedad en la nueva lista. Estoy tratando de usar Java8 Stream, ya que simplifica la clonación. Mi código funciona pero le da olor a este código de Sonar: Local ...

Acceda a archivos PDF desde 'res / raw' o carpeta de activos mediante programación para analizar con los métodos dados Explicación: En este momento, este programa accede a un archivo desde un administrador de archivos que toma los archivos seleccionados ...

Preguntas: Hola, quiero insertar un mapa dinámico de Google en mi sitio web por dirección IP. Obtuve los datos lat / long usando herramientas de IPlocation. Todavía no pude conectar los datos lat / log de IPlocation a google ...

Estoy haciendo mi proyecto "Sistema de reemplazo facial" en Java. Para eso, primero debo verificar si una imagen está subexpuesta o sobreexpuesta, y aclararla o hacerla menos brillante, respectivamente. Hay un ...