Contexto
En el post anterior de nuestro blog «Programación Reactiva. Reactor y Webflux» ya habíamos presentado el magnífico concepto de la programación reactiva y su uso a través de las librerías Project Reactor, RxJava2, Akaa, etc.
Damos un paso más y vamos a exponer cómo de la mano de dichas herramientas, analizamos e implementamos soluciones reales en proyectos nuestros. Spoiler: parece que a nuestros clientes les gustan tiempos más eficientes.
La aplicación es enorme, ¿por dónde empezamos?
El punto vital de toda solución a todo problema reside en el seccionamiento de este en pequeñas tareas, vamos a ello:
Primero, es fundamental realizar un trabajo de campo: sumergirse en la aplicación y explorarla a fondo en busca de ineficiencias, al menos utilizándola como un usuario común. Los usuarios frecuentes suelen estar familiarizados con las secciones más problemáticas, pero existen muchas otras áreas donde, con una solución adecuada, tiempos de respuesta de 3 segundos podrían reducirse a 0.5 segundos. Puede parecer un detalle menor, pero con el tiempo, y al acumularse otros casos similares, como desarrolladores y testers que experimentan la aplicación de primera mano, lo agradeceréis incluso vosotros.
Como segundo plato, tenemos que seleccionar los casos más notables, nos interesa empezar por lo que más llama la atención y a partir de ahí ir poco a poco puliendo la eficiencia general.
¿Pero qué es lo que estamos buscando cambiar exactamente? Como mencioné, hay casos que destacan claramente, pero también existen otros en los que uno podría asumir que el tiempo de respuesta es simplemente el que tiene que ser, dado que se manejan grandes volúmenes de datos. Sin embargo, esto no siempre tiene por qué ser así.
Existen partes de la aplicación web donde se pueden paralelizar llamadas a la base de datos, aplicar concurrencia en cálculos costosos, dejar en segundo plano la obtención de respuestas de un servicio externo o incluso de nuestros propios procesos. Además, gracias a los «Fluxes», podríamos enviar notificaciones en tiempo real al front-end sin necesidad de realizar llamadas periódicas. Si fuera necesario, podríamos incluso implementar técnicas de Backpressure cuando dependemos de un productor en tiempo real (aunque, siendo honestos, hasta ahora no nos ha hecho falta). Esto es básicamente lo que buscamos: pequeñas optimizaciones.
A domar los leones con WebFlux
Ya tenemos a los culpables, nos han fastidiado frecuentemente y toca hacer justicia. Por un lado tenemos una mega consulta, En entornos de producción, esta funcionalidad maneja una magnitud de registros del orden de cientos de miles. Por supuesto, la consulta está paginada y no obtenemos todos esos datos de una sola vez, pero esto no es la solución definitiva. Estos registros son costosos de obtener porque resultan de uniones de datos provenientes de distintas tablas.
Nuestra solución:
Flux<Tuple2<Integer, Integer>> partialIntervalsFlux =
Flux.generate(
intervalIncognitos::getT1,
(state, sink) -> {
if(state <= intervalIncognitos.getT2()) {
Integer upperLimit = Math.min(state + 1000, intervalIncognitos.getT2()+1);
sink.next(Tuples.of(state, upperLimit));
return upperLimit;
} else {
sink.complete();
return state;
}
});
ParallelFlux<List<BusquedaIncognitoDTO>> fluxBusquedaIncognitoDtos =
partialIntervalsFlux.parallel().runOn(Schedulers.boundedElastic()),
map(interval -> incognitoDAO.listar(incognitoFilter, interval.getT1(), interval.getT2(), idCentro));
fluxBusquedaIncognitoDtos.subscribe( listaBusquedaIncognito -> {
/* Procesamos los elementos de la lista de forma concurrente */
});
Por el lado del servicio, aplicamos la concurrencia, el código divide el rango de registros en intervalos de hasta 1000 unidades, para posteriormente en paralelo realizar las modificaciones necesarias para cada chunk de registros a través de la subscripción aplicada a «fluxBusquedaIncognitoDtos».
Por otro lado, «partialIntervalsFlux», una vez inicializado, se encarga de emitir valores en intervalos (Integer) a través del uso de sink.next() hasta que finaliza vía sink.complete().
A través del uso de Schedulers.boundedElastich dejamos a la entidad Scheduler de Reactor decidir el número de threads a ser ejecutados, por default es el número de cores de la CPU x 10.
Por el lado del DAO eliminamos la paginación por ‘sql’ y la podemos simplificar de ‘joins’ dado que pasamos el procesamiento a java, para poder aplicar concurrencia, de esta forma para una página podemos procesar el número de registros que queramos a la vez, ¡ya que son independientes entre sí! Teniendo en cuenta los requisitos de memoria que se aplica para en este caso la ejecución de java.
De esta forma y para nuestra aplicación se ha conseguido reducir el tiempo de respuesta de la consulta en un 75%.
Otra optimización: Caso de envío de mensajería, nuestra aplicación era dependiente de la respuesta para un envío a un ws externo de un mensaje para que este posteriormente fuese enviado con formato sms o email. En este sentido, estábamos esperando a la respuesta para desbloquear al usuario la parte frontal para que pudiese seguir utilizando la aplicación.
Tras la revisión con Webflux decidimos realizar una aproximación que constaba de varios hechos:
- No necesitamos realizar una espera activa del envío del mensaje, enviamos y nos olvidamos, no hacemos esperar al usuario.
- Realmente no nos olvidamos, podemos dejar el estado del envío del mensaje en la parte frontal en algún lado, de esta forma en el momento que haya finalizado el envío podríamos ver si ha dado error o no a través de un texto a color o lo que fuera.
try {
enviadoSinErrores = servicioEmailsSMS.decidirYEnviar(comunicacion);
} catch (Exception e) {
throw new incognitoException(new incognitoException(Constantes.INCAPAZ_ENVIAR_COMUNICACION, e));
}
if (Boolean.TRUE.equals(enviadoSinErrores)) {
/* Devolvemos OK */
} else {
/* Devolvemos KO */
}
Para la respuesta http de la llamada de envío del front nos centramos únicamente en si hemos podido enviar el mensaje y retornamos.
Posteriormente en la capa de servicio y a través de un hilo concurrente realizamos el envío y nos despegamos del hilo principal para esperar la respuesta y procesarla en caso de éxito o error.
Callable<Response> callable = () -> enviarComunicado(mb);
LogUtils.writeLogOperaciones(ConstantesLogs.NivelTraza.INFO, "PRUEBA DE CONCEPTO: SE HA LLAMADO A COMUNICACIÓN DE FORMA CONCURRENTE");
Mono.fromCallable(callable).subscribeOn(Schedulers.parallel())
.doOnSubscribe(subscription -> LogUtils.writeLog(Constantes.NivelTraza.INFO, "Subscribed"))
.doOnSuccess(res -> LogUtils.writeLog(Constantes.NivelTraza.INFO, "Success"))
.doOnError(error -> LogUtils.writeLog(Constantes.NivelTraza.ERROR, "Error" + error))
.subscribe(res -> {
try {
byte[] respuestaMono = procesarRespuesta(res);
LogUtils.writeLog(Constantes.NivelTraza.INFO,
"PRUEBA DE CONCEPTO: SE HA RECIBIDO LA COMUNICACIÓN DE FORMA CONCURRENTE");
} catch (incognitoException e) {
LogUtils.writeLog(Constantes.NivelTraza.ERROR, e.getMessage());
throw new RuntimeException("Error interno. Contacte con el administrador", e);
}
}, error -> {
LogUtils.writeLog(Constantes.NivelTraza.INFO,
"PRUEBA DE CONCEPTO: SE HA RECIBIDO LA COMUNICACIÓN DE FORMA CONCURRENTE");
LogUtils.writeLog(Constantes.NivelTraza.ERROR, error.getMessage());
throw new RuntimeException("Error interno. Contacte con el administrador", error);
});
Por último, tratamos un campo derivado de 5 tablas distintas aprox. para su obtención:
Tenemos un campo el cual es un entero responsable de indicar la disponibilidad de agentes en un horario en concreto. Dicha disponibilidad es calculada por los festivos de un establecimiento, por los de otro establecimiento, por el número de ausencias de agentes para ese horario, etc.
En resumen, estamos lidiando con una combinación de muchas consultas y procesamientos independientes de variables, que al final se suman para obtener el resultado completo que necesitamos. Esto es ideal para Webflux: podemos lanzar varios hilos para procesar cada variable de forma independiente y simultánea, y luego esperar a que todos los hilos terminen para completar el cálculo de cada horario. Paralelizando las tareas y optimizando el proceso general.
/* LLAMAMOS AL CALCULO DE CADA DEPENDENCIA */
Mono<List<Dependencia1>> monoDependencia1 =
Mono.fromCallable(() -> servicioDependencia1.procesarYlistar(filter.getFecha(), currentId)).subscribeOn(Schedulers.boundedElastic());
Mono<List<Dependencia2>> monoDependencia2 =
Mono.fromCallable(() -> servicioDependencia2.procesarYlistar(filter, currentId)).subscribeOn(Schedulers.boundedElastic());
Mono<List<Dependencia3>> monoDependencia3 =
Mono.fromCallable(() -> servicioDependencia3.procesarYlistar(filter, currentId)).subscribeOn(Schedulers.boundedElastic());
Mono<List<Dependencia4>> monoDependencia4 =
Mono.fromCallable(() -> servicioDependencia4.procesarYlistar(filter, currentId)).subscribeOn(Schedulers.boundedElastic());
Mono<List<Dependencia5>> monoDependencia5 =
Mono.fromCallable(() -> servicioDependencia5.procesarYlistar(filter, currentId)).subscribeOn(Schedulers.boundedElastic());
/* ESPERAMOS AL CALCULO DE TODAS LAS DEPENDENCIAS PARA AVANZAR */
Mono<Tuple5<List<Dependencia1>, List<Dependencia2>, List<Dependencia3>, List<Dependencia4>, List<Dependencia5>>> combinedMono =
Mono.zip(
monoDependencia1,
monoDependencia2,
monoDependencia3,
monoDependencia4,
monoDependencia5
).blockOptional();
/* OBTENEMOS EL RESULTADO DEL CALCULO DE LAS DEPENDENCIAS A LA VEZ */
List<Dependencia1> dependencias1 = combinado.map(Tuple5::getT1).orElse(new ArrayList<>());
List<Dependencia2> dependencias2 = combinado.map(Tuple5::getT2).orElse(new ArrayList<>());
List<Dependencia3> dependencias3 = combinado.map(Tuple5::getT3).orElse(new ArrayList<>());
List<Dependencia4> dependencias4 = combinado.map(Tuple5::getT4).orElse(new ArrayList<>());
List<Dependencia5> dependencias5 = combinado.map(Tuple5::getT5).orElse(new ArrayList<>());
Conclusión
La falta de optimización en nuestros procesos puede resultar en retrasos significativos, lo cual conlleva costos adicionales y aumenta el tiempo necesario para pruebas y uso. Estos retrasos no solo afectan la eficiencia general, sino que también incrementan los recursos requeridos y el tiempo dedicado a la resolución de problemas. Utilizando WebFlux y Reactor, podemos abordar estos desafíos de manera efectiva. Estas herramientas nos permiten optimizar y reducir los tiempos en una variedad de procesos, desde los más simples hasta los más complejos. Aunque el impacto de la optimización en cada proceso individual puede parecer pequeño, la verdadera diferencia se manifiesta en la suma total de estos tiempos reducidos. Al mejorar cada componente del sistema, conseguimos una mejora global en la eficiencia y el rendimiento, lo que se traduce en una experiencia de usuario más ágil y costos operativos más bajos.