Procesamiento Declarativo y Paralelo de Datos con la API Stream de Java

Fundamantos de la API Stream en Java

Introducida en Java 8, la API de Streams transformó radicalmente el manejo de colecciones en el ecosistema Java. Proporciona un modelo de porgramación declarativo para procesar secuencias de elementos, permitiendo operaciones complejas como filtrado, transformación, ordenamiento y reducción. Al abstraer la iteración externa, no solo mejora la legibilidad del código, sino que también facilita la ejecución concurrente aprovechando las arquitecturas multinúcleo modernas.

Clasificación de los Streams

Los flujos de datos en Java se pueden categorizar según su comportamiento de ejecución y el tipo de datos que manipulan.

Según el modo de ejecución

  • Secuenciales: Procesan los elementos uno por uno, respetando el orden de encuentro de la fuente de datos original. Operan en un único hilo.
  • Paralelos: Dividen el conjunto de datos en segmentos más pequeños y los procesan simultáneamente utilizando múltiples hilos (internamente apoyados por el ForkJoinPool). Los resultados parciales se combinan al final.
  • Infinitos: No tienen un tamaño fijo. Generan elementos bajo demanda y requieren operaciones de corto circuito (como limit) para poder ser consumidos.

Según el tipo de datos

  • Streams de objetos: La interfaz genérica Stream<T> maneja referencias a objetos.
  • Streams primitivos: Diseñados para evitar el coste de rendimiento del boxing y unboxing. Incluyen IntStream, LongStream y DoubleStream.

Flujos Secuenciales

En un flujo secuencial, las operaciones intermedias y terminales se ejecutan en el hilo principal, garantizando que el orden de procesamiento coincida con el orden de la fuente original.

import java.util.List;

public class ProcesamientoSecuencial {
    public static void main(String[] args) {
        List<Double> temperaturasCelsius = List.of(20.5, 22.0, 19.8, 25.3, 18.0);
        
        List<Double> temperaturasFahrenheit = temperaturasCelsius.stream()
                .map(celsius -> (celsius * 9/5) + 32)
                .toList();
                
        System.out.println(temperaturasFahrenheit); 
    }
}

Flujos Paralelos

Los flujos paralelos distribuyen la carga de trabajo entre varios núcleos de la CPU. Son altamente beneficiosos para grandes volúmenes de datos o tareas intensivas en procesamiento, aunque introducen consideraciones sobre la seguridad de los hilos y la preservación del orden.

import java.util.List;

public class ProcesamientoParalelo {
    public static void main(String[] args) {
        List<Integer> stockProductos = List.of(150, 300, 45, 500, 210);
        
        List<Integer> stockReajustado = stockProductos.parallelStream()
                .filter(cantidad -> cantidad > 100)
                .map(cantidad -> cantidad - 50)
                .toList();
                
        System.out.println(stockReajustado);
    }
}

Flujos Infinitos

Generan elementos dinámicamente. Dado que no tienen fin, es crucial aplicar operaciones de terminación corta para evitar bucles infinitos en la aplciación.

import java.util.UUID;
import java.util.stream.Stream;

public class FlujosInfinitos {
    public static void main(String[] args) {
        // Generación de potencias de 2
        Stream.iterate(1, n -> n * 2)
              .limit(8)
              .forEach(System.out::println);
              
        // Generación de identificadores únicos
        Stream.generate(UUID::randomUUID)
              .limit(3)
              .forEach(System.out::println);
    }
}

Creación de Streams

Desde Colecciones y Arreglos

Las colecciones proveen métodos directos, mientras que para arreglos se utilizan utilidades de la clase Arrays o Stream.

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class CreacionStreams {
    public static void main(String[] args) {
        // Desde Listas
        List<String> roles = List.of("admin", "user", "guest");
        Stream<String> flujoRoles = roles.stream();
        Stream<String> flujoRolesParalelo = roles.parallelStream();

        // Desde Arreglos
        int[] edades = {25, 30, 22, 40};
        IntStream flujoEdades = Arrays.stream(edades);
        
        String[] ciudades = {"Madrid", "Lima", "Bogota"};
        Stream<String> flujoCiudades = Stream.of(ciudades);
        
        // Usando varargs
        Stream<String> flujoDirecto = Stream.of("Tokio", "Seul", "Pekin");
    }
}

Métodos Avanzados de Creación

Stream.ofNullable() (Java 9) previene errores de nulidad creando un flujo de un solo elemento o un flujo vacío.

String valorPotencial = null;
Stream<String> flujoSeguro = Stream.ofNullable(valorPotencial);
System.out.println("Elementos: " + flujoSeguro.count()); // 0

Stream.concat() une dos flujos del mismo tipo en uno solo.

Stream<String> flujoA = Stream.of("X", "Y");
Stream<String> flujoB = Stream.of("Z", "W");
Stream<String> flujoCombinado = Stream.concat(flujoA, flujoB);

El patrón Builder permite construir flujos elemento por elemento, y Stream.empty() es útil para retornar flujos vacíos como valores por defecto.

Stream<String> flujoConstruido = Stream.<String>builder()
    .add("item1")
    .add("item2")
    .build();

Stream<Object> flujoVacio = Stream.empty();

Operaciones Intermedias

Estas operaciones transforman el flujo y son perezosas; no se ejecutan hasta que se invoca una operación terminal.

Filtrado y Mapeo

import java.util.List;

public class OperacionesIntermedias {
    public static void main(String[] args) {
        List<String> palabras = List.of("java", "stream", "api", "funcional");
        
        // filter: retener palabras con más de 4 caracteres
        // map: convertir a mayúsculas
        List<String> resultado = palabras.stream()
                .filter(p -> p.length() > 4)
                .map(String::toUpperCase)
                .toList();
                
        System.out.println(resultado); // [STREAM, FUNCIONAL]
    }
}

Aplanamiento con flatMap y mapMulti

flatMap aplana estructuras anidadas. mapMulti (Java 16) ofrece una alternativa más eficiente para generar cero o múltiples elementos por cada entrada sin crear flujos intermedios.

import java.util.Arrays;
import java.util.List;

public class Aplanamiento {
    public static void main(String[] args) {
        List<String> oraciones = List.of("hola mundo", "java es genial");
        
        // flatMap
        List<String> palabrasFlat = oraciones.stream()
                .flatMap(oracion -> Arrays.stream(oracion.split(" ")))
                .toList();
                
        // mapMulti
        List<String> palabrasMulti = oraciones.stream()
                .<String>mapMulti((oracion, consumer) -> {
                    for (String palabra : oracion.split(" ")) {
                        consumer.accept(palabra);
                    }
                })
                .toList();
    }
}

Ordenamiento y Eliminación de Duplicados

import java.util.Comparator;
import java.util.List;

record Empleado(String nombre, int edad) {}

public class Ordenamiento {
    public static void main(String[] args) {
        List<Empleado> personal = List.of(
            new Empleado("Ana", 28), new Empleado("Luis", 25), new Empleado("Ana", 28)
        );
        
        List<Empleado> ordenados = personal.stream()
                .distinct() // Elimina duplicados basados en equals/hashCode
                .sorted(Comparator.comparingInt(Empleado::edad).reversed())
                .toList();
    }
}

Segmentación: limit, skip, takeWhile, dropWhile

import java.util.List;

public class Segmentacion {
    public static void main(String[] args) {
        List<Integer> puntuaciones = List.of(85, 90, 78, 92, 60, 88);
        
        // Tomar los primeros 3
        List<Integer> topTres = puntuaciones.stream().limit(3).toList();
        
        // Saltar los primeros 2
        List<Integer> sinDosPrimeros = puntuaciones.stream().skip(2).toList();
        
        // Java 9+: Tomar mientras sea mayor a 80 (se detiene en 78)
        List<Integer> altasIniciales = puntuaciones.stream()
                .takeWhile(p -> p > 80).toList(); 
                
        // Java 9+: Descartar mientras sea mayor a 80 (mantiene desde 78 en adelante)
        List<Integer> restantes = puntuaciones.stream()
                .dropWhile(p -> p > 80).toList(); 
    }
}

Operaciones Terminales

Las operaciones terminales desencadenan el procesamiento del flujo y producen un resultado final o un efecto secundario.

Recolección con collect

La operación collect es extremadamente versátil gracias a la clase Collectors.

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

record Producto(int id, String categoria, double precio) {}

public class Recoleccion {
    public static void main(String[] args) {
        List<Producto> inventario = List.of(
            new Producto(1, "Electronica", 500.0),
            new Producto(2, "Ropa", 45.0),
            new Producto(3, "Electronica", 120.0)
        );
        
        // Agrupar por categoría
        Map<String, List<Producto>> porCategoria = inventario.stream()
                .collect(Collectors.groupingBy(Producto::categoria));
                
        // Particionar por precio (mayor o menor a 100)
        Map<Boolean, List<Producto>> carosVsBaratos = inventario.stream()
                .collect(Collectors.partitioningBy(p -> p.precio() > 100));
                
        // Unir nombres en un solo string
        String categoriasUnicas = inventario.stream()
                .map(Producto::categoria)
                .distinct()
                .collect(Collectors.joining(", "));
    }
}

Reducción y Búsqueda

reduce combina los elementos en un único valor. Los métodos de búsqueda devuelven un Optional.

import java.util.List;
import java.util.Optional;

public class ReduccionYBusqueda {
    public static void main(String[] args) {
        List<Integer> valores = List.of(10, 20, 30, 40);
        
        // Suma total mediante reducción
        int sumaTotal = valores.stream().reduce(0, Integer::sum);
        
        // Encontrar el máximo
        Optional<Integer> maximo = valores.stream().max(Integer::compareTo);
        
        // Buscar el primer elemento que cumpla una condición
        Optional<Integer> primeroMayorA25 = valores.stream()
                .filter(v -> v > 25)
                .findFirst();
                
        // En flujos paralelos, findAny es más eficiente que findFirst
        Optional<Integer> cualquiera = valores.parallelStream().findAny();
    }
}

Coincidencias y Conteo

import java.util.List;

public class Coincidencias {
    public static void main(String[] args) {
        List<String> correos = List.of("admin@test.com", "user@test.com", "info@test.com");
        
        // ¿Todos terminan en "@test.com"?
        boolean todosCoinciden = correos.stream().allMatch(c -> c.endsWith("@test.com"));
        
        // ¿Existe alguno que inicie con "admin"?
        boolean algunoCoincide = correos.stream().anyMatch(c -> c.startsWith("admin"));
        
        // ¿Ninguno contiene "spam"?
        boolean ningunoCoincide = correos.stream().noneMatch(c -> c.contains("spam"));
        
        // Conteo de elementos
        long totalCorreos = correos.stream().count();
    }
}

Etiquetas: java-stream-api java-8 programacion-funcional java-16 procesamiento-paralelo

Publicado el 6-12 22:09