¿Cómo puedo agregar elementos en un flujo por grupo / cómo reducir en grupo?

Suponga que tiene un flujo de objetos con la siguiente estructura:

class Element {
  String key;
  int count;
}

Ahora imagine que esos elementos fluyen en un orden de clasificación predefinido, siempre en grupos de una clave, como

{ key = "firstKey",  count=123}
{ key = "firstKey",  count=1  }
{ key = "secondKey", count=4  }
{ key = "thirdKey",  count=98 }
{ key = "thirdKey",  count=5  }
 .....

Lo que quiero hacer es crear un flujo que devuelva un elemento para cada distinto keyy sumado countpara cada grupo de claves. Entonces, básicamente, como una reducción clásica para cada grupo, pero el uso del reduceoperador no funciona, ya que solo devuelve un único elemento y quiero obtener un flujo con un elemento para cada tecla distinta.

Usar bufferUntilpodría funcionar, pero tiene el inconveniente de que tengo que mantener un estado para verificar si keyha cambiado en comparación con el anterior.

Usar groupByes una exageración, ya que sé que cada grupo ha llegado a su fin una vez que se encuentra una nueva clave, por lo que no quiero guardar nada en caché después de ese evento.

¿Es posible tal agregación utilizando Flux, sin mantener un estado fuera del flujo?

Respuesta 1

¿En este eclipse de cadena que muestra una advertencia de código muerto en "No coincide"? Cadena b = ("goodString") == ("goodString")? "Condición Macth": "No coincide"; ¿Comprueba la cadena incluso antes de compilar? Cadena a = ...

Estoy creando una aplicación que accede a una base de datos y actualmente tengo un SWT Shell que muestra la tabla principal. Desde esta tabla, los usuarios pueden abrir registros individuales y desde el interior de los registros el usuario ...

Esta es una pregunta de seguimiento de mi pregunta anterior. Estoy tratando de implementar un NavigableMap definido como NavigableMap <Timestamp, Event>. Necesitaría que este mapa sea como un caché para mí. Cada ...

Tengo una aplicación Java EE que utiliza JAX-WS 2.2 y tengo que implementarla en WebSphere Application Server 7, pero parece que esta versión de WAS es compatible con hasta JAX-WS 2.1. ¿Hay alguna forma de ...