Crear un procesador personalizado en Nifi y enviarlo a PublishKafka

¿Puede alguien proporcionarme un ejemplo simple para configurar un archivo de flujo en un procesador Nifi personalizado para que la carga se pueda enviar a través del procesador PublishKafka?

Tengo un protocolo de mensajería heredado para el que escribí un procesador personalizado. Estructura bastante simple, solo un MessageID (String) y el MessageBody (byte []). Mi procesador personalizado maneja la entrada con los mensajes que se reciben bien. Ahora estoy tratando de poner estos datos en un archivo de flujo para que puedan enviarse al procesador PublishKafka, pero he tenido problemas para encontrar recursos en línea sobre cómo hacerlo. Aquí está mi fragmento de código actual de la parte relevante:

    try {
         this.getLogger().info("[INFO - ListenMW] - Message Received: " +
                            data.getMsgID().toString() + " Size: " +
                            data.getMsgData().length);
         this.currentSession.adjustCounter("MW Counter", 1, true);

         // Setup the flowfile to transfer
         FlowFile flowfile = this.currentSession.create();
         flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
         flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));

         this.currentSession.transfer(flowfile, SUCCESS);


     }catch(Exception e) {
          this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
          this.currentSession.adjustCounter("MW Failure", 1, true);
     }

No he podido determinar qué atributo (s) usar para msgID y msgData, así que creé el mío por ahora. Vi una publicación en la que alguien recomendaba construir su propia estructura json y enviarla como su carga útil, pero de nuevo, ¿qué atributo enviaría a través de ella para que se asigne correctamente al mensaje kafka? Soy bastante nuevo en Kafka y hasta ahora solo he experimentado con casos de prueba rudimentarios, así que perdone mi ignorancia por cualquier suposición equivocada.

Gracias por cualquier orientación! Estoy usando Kafka2.0.1 y el procesador PublishKafka_2.0.

Respuesta 1

Estoy usando un XPage como agente (XAgent) que hace una llamada SSJS en algunas clases de Java almacenadas como elementos de diseño de Java. Quiero que los procesos que instiga XPage estén en el contexto de ...

Estoy creando una aplicación de Android donde los usuarios pueden encontrar los lugares más cercanos de acuerdo con su ubicación actual y también, poder agregar lugares a la base de datos. Entiendo que tendré que usar ...

¿Es diferente que nos refiramos a un objeto de clase que no usa parcelable? Cuando valoramos una parcela, tenemos que hacer un objeto de esa clase. Lo mismo es cierto al leerlo. Si realmente somos ...

Actualmente estoy trabajando en un pequeño proyecto impartido por un curso de edX. Me quedé con una cosa, tomar valores de los usuarios y almacenarlos en una matriz 2D. Sin embargo, los valores deben estar en un rango, y entonces ...