Java 8 to 10 - (2) 함수형 데이터 처리

스트림

스트림의 특성

  • 스트림 API의 특성을 한 문장으로 요약하자면 선언형이며 조립할 수 있고 병렬화가 가능하다는 것이다.
  • 스트림은 여러 종류의 데이터 소스로 부터 연속된 요소를 소비(일회성)하며 반복을 내재화(내부 반복)하여 로직을 단순화하고 손쉽게 병렬화 할 수 있다.

스트림 연산

  • 스트림 API에서는 중간 연산(intermediate op), 최종 연산(terminal op)의 두가지 연산 종류를 제공한다.
  • 중간 연산
    • 중간 연산은 스트림을 반환하며 여러 연산을 연결하여 복잡한 질의를 만들 수 있다.
    • 중간 연산은 연결시에 실제로 연산이 이루어지지 않으며 최종 연산을 통해서 한 번에 연산을 처리한다 (lazy operation).
    • 중간 연산은 내부에서 최적화될 수 있다 (short circuit, loop fusion).
  • 최종 연산
    • 최종 연산은 스트림 파이프라인에서 결과를 도출한다.
    • 결과는 컬렉션 collect, 값 count, 사이드 이펙트 forEach 등으로 도출될 수 있다.

스트림 활용

필터링

  • .filter(predicate): 조건을 만족하는 요소만 필터링 한다.
  • .distinct(): 고유한 요소로 이루어진 스트림을 반환한다.

슬라이싱

  • .takeWhile(predicate): 조건을 만족하는 요소까지만 반환하는 스트림을 생성한다.
  • .dropWhile(predicate): 조건을 만족하는 요소까지 버리고 나머지 요소를 반환하는 스트림을 생성한다.
  • .limit(maxSize): 지정된 크기만큼 요소를 반환하는 스트림을 생성한다.
  • .skip(n): 지정된 개수만큼 요소를 버리고 나머지를 반환하는 스트림을 생성한다.

매핑

  • .map(expr): 각 요소에 지정된 함수를 적용하여 반환된 값을 돌려주는 스트림을 생성한다.
  • .flatMap(expr): 각 요소에 지정된 함수를 적용하여 반환된 스트림을 합쳐서 하나의 스트림으로 돌려준다.

검색과 매칭

  • .anyMatch(predicate): 적어도 하나의 요소가 조건에 일치하는지 확인한다.
  • .allMatch(predicate): 모든 요소가 조건에 일치하는지 확인한다.
  • .noneMatch(predicate): 모든 요소가 조건에 일치하지 않는지 확인한다.

요소 검색

  • .findAny(): 스트림에서 임의의 요소를 반환한다.
  • .findFirst(): 스트림에서 첫번째 요소를 반환한다.
.findAny()와 .findFirst()의 차이? 스트림이 병렬로 실행될 경우 첫번째 요소를 찾기 위해서는 경우에 따라 모든 결과를 계산해야될 필요가 있다. 따라서 스트림에서 순서와 상관없이 하나의 요소를 가져오는 것이 필요하다면 .findAny()를 사용하여 효율적으로 데이터를 가져올 수 있다.

리듀싱

  • .reduce(initValue, reducer): 초기값과 리듀서 함수를 이용하여 스트림에 있는 요소를 합쳐서 하나의 결과를 만들어낸다. 리듀서 함수는 BinaryOperator<T> 타입으로 첫번째 인자는 누적값, 두번째 인자는 스트림의 요소가 주어진다.
  • .reduce(reducer): 초기값이 없는 경우 결과가 Optional<T> 로 반환된다. 스트림이 비어있는 경우 값이 없는 경우가 존재하기 때문이다.

기본형 특화 스트림

  • .mapToInt(expr), .mapToLong(expr), .mapToDouble(expr): 람다식을 각 요소에 적용하여 나온 결과를 기본형 특화 스트림으로 반환한다.
  • .boxed(): 기본형 특화 스트림을 다시 객체 스트림으로 변환한다.
  • .min(), .max(): 기본형 특화 스트림에서 최소, 최대값을 반환한다. 단, 반환되는 값은 Optional<T>의 기본형 특화 타입인 OptionalInt, OptionalLong, OptionalDouble 로 반환된다.

범위

  • IntStream.range(start, end): [start, end) 범위의 숫자를 돌려주는 스트림을 생성한다.
  • IntStream.rangeClosed(start, end): [start, end] 범위의 숫자를 돌려주는 스트림을 생성한다.

스트림 생성

  • Stream.of(values): 하나 또는 두개 이상의 값들로 스트림을 생성한다.
  • Stream.empty(): 빈 스트림을 생성한다.
  • Stream.ofNullable(value): 값이 null인 경우 빈 스트림을, 아닌 경우 값이 하나인 스트림을 생성한다.
  • Arrays.stream(array): 배열로 스트림을 생성한다.
  • Files.lines(path): 파일로 부터 한 줄씩 읽어서 스트림을 생성한다.

언바운드 스트림 (unbounded stream)

  • .iterate(initValue, [predicate,] iterator): 지정된 초기값으로 부터 iterator UnaryOperator<T>를 적용하여 무한한 요소를 가지는 스트림을 생성한다. predicate를 지정할 경우 조건을 만족할 때까지만 요소를 생성한다. 조건을 지정하는 대신 .takeWhile을 사용해도 된다.
  • .generate(supplier): 지정된 값 생성 함수 Supplier<T>를 이용하여 무한한 요소를 가지는 스트림을 생성한다.

컬렉터

개요

  • 컬렉터는 .collect 최종 연산에서 결과를 생성하기 위해 인자로 주어지는 함수이다.
  • java.util.stream.Collectors 클래스에서 미리 정의된 컬렉터를 제공한다.
    • 보통 import static java.util.stream.Collectors; 와 같은 형식으로 import 해서 사용한다.
  • 컬렉터의 유형으로는 (1) 요약 (리듀스), (2) 그룹화, (3) 분할의 세가지 유형이 있다.

요약

  • toList(), toSet(), toCollection(factory): 스트림의 요소를 List<T>, Set<T>, 그리고 컬렉션 생성 함수 factory 로 생성된 컬렉션에 추가하는 컬렉터를 생성한다.
  • summingInt(expr), summingLong(expr), summingDouble(expr): 스트림 요소에 람다를 적용한 결과를 모두 더하는 컬렉터를 생성한다.
  • averagingInt(expr), averagingLong(expr), averagingDouble(expr): 스트림 요소에 람다를 적용한 결과를 평균내는 컬렉터를 생성한다.
  • summarizingInt(expr), summarizingLong(expr), summarizingDouble(expr): 스트림 요소에 람다를 적용한 결과의 통계를 내는 컬렉터를 생성한다. 생성된 통계 객체는 개수 count, 합 sum, 최소 min, 최대 max, 평균 average 값을 제공한다.
  • joining([delimiter]): 스트림의 모든 문자열을 연결하여 하나의 문자열로 반환하는 컬렉터를 생성한다. 구분자 delimiter 문자열을 넣으면 각 문자열 사이에 구분자를 넣어서 모든 문자열을 연결한 결과를 돌려준다.
  • reducing([initValue, [converter,]] reducer): 스트림의 요소에 리듀서를 적용하여 요약된 결과를 돌려주는 컬렉터를 생성한다. 초기값과 변환 함수를 모두 생략한 경우 스트림의 첫번째 요소를 초기값으로 하여 연산을 수행하며 결과 타입이 Optional<T>이 된다. 초기값만 주고 변환 함수를 생략하는 것도 가능하며 이때는 스트림의 요소가 그대로 리듀서에서 사용된다.

그룹화

  • groupingBy(classifier[, [mapFactory,] downstream]): 분류 함수 classifier 의 결과값을 기준으로 스트림의 요소를 분리하여 Map<K, V>에 저장한다. 맵의 경우 맵 객체 생성 함수 mapFactory가 주어지면 해당 함수를 사용하며 기본적으로는 상황에 따라서 알맞는 맵 객체를 생성하는 함수가 지정된다. 분류된 결과를 누적하는 방식은 또 다른 컬렉터 downstream 를 지정하여 정할수 있으며 지정하지 않는다면 toList 컬렉터를 사용하여 List<T> 형식으로 누적된다.
  • filtering(predicate, downstream): 조건을 만족하는 요소만 수집하는 컬렉터를 생성한다. 최종 결과 생성은 추가로 지정된 컬렉터 downstream 에서 수행한다.
  • mapping(mapper, downstream): 스트림의 각 요소에 함수 mapper를 적용한 결과를 수집하는 컬렉터를 생성한다. 최종 결과 생성은 추가로 지정된 컬렉터 downstream 에서 수행한다.
  • collectingAndThen(downstream, converter): 지정된 컬렉터 downstream 에서 나온 결과값에 변환 함수 converter를 적용하여 돌려주는 컬렉터를 생성한다.
collectingAndThen은 어떤 경우에 사용될까? maxBy와 같은 컬렉터를 사용할 경우 결과값은 Optional<T> 이 된다. groupingBy 컬렉터는 값이 존재하는 경우에만 맵에 데이터를 넣게되므로 항상 값이 존재할 수 밖에 없다. 따라서 Optional wrapper를 굳이 유지할 필요가 없으므로 결과에 Optional::get을 적용하여 최종적으로는 해당 값을 맵에 저장할 수 있다.

분할

  • partitioningBy(predicate[, downstream]): 조건에 따른 참 true, 거짓 false 두 가지 경우로 스트림의 요소를 분할하여 수집하는 컬렉터를 생성한다. 수집 방식에 대한 컬렉터 downstream 을 지정하지 않는 경우 기본으로 toList 컬렉터가 사용된다.

커스텀 컬렉터

  • 컬렉터 클래스의 인터페이스는 아래와 같다.
// T: 스트림 요소의 타입, A: 누적자의 타입, R: 결과 타입 public interface Collector<T, A, R> { // 새로운 결과 컨테이너를 만든다 (e.g. ArrayList::new) Suppiler<A> supplier(); // 누적자에 스트림 요소를 더한다 (e.g. List::add) BiConsumer<A, T> accumulator(); // 누적자를 결과로 변환한다 (e.g. Function.identity()) Function<A, R> finisher(); // 두 누적자를 합친다 // (e.g. (l1, l2) -> { l1.addAll(l2); return l1; }) BinaryOperator<A> combiner(); // 컬렉터의 특성을 지정한다 //(e.g. Collections.unmodifiableSet( // EnumSet.of(IDENTITY_FINISH))) Set<Characteristics> characteristics(); };
  • 컬렉터의 특성은 아래와 같다
    • UNORDERED: 컬렉터 연산 결과가 입력 요소의 방문 순서를 보존하는 것을 보장하지 않는다.
    • CONCURRENT: 다중 스레드에서 동일한 컨테이너에 accumulator 를 동시에 호출할 수 있다 (따라서 컨테이너는 thread-safe 해야 한다). UNORDERED를 지정하지 않았다면 입력 스트림 또한 순서가 상관 없는 경우에만 동시에 누적 작업을 진행할 수 있다.
    • IDENTITY_FINISH: finisher를 호출하지 않고 누적자를 그대로 결과로 변환할 수 있는 경우 지정한다. 즉, 누적자를 그대로 사용할 수 있으며 누적자와 결과의 타입이 다르다면 안전하게 형 변환이 가능해야 한다.

병렬 데이터 처리

병렬 스트림

  • 컬렉션에 .parallelStream()을 호출하거나 스트림에 .parallel()을 호출하여 병렬 스트림을 얻을 수 있다.
  • .sequential()을 호출하여 다시 순차적으로 처리하는 스트림으로 변경할 수 있다.
  • 스트림의 중간 연산은 최종적으로 호출된 .parallel(), .sequential()의 영향을 받는다. 즉, 두 메서드를 몇번을 호출하더라도 마지막에 호출된 것에 따라 순차/병렬로 실행된다.
  • 병렬 스트림은 내부적으로 ForkJoinPool을 사용하여 실행되며 thread pool 크기의 기본값은 프로세서 수로 지정된다.
  • 특정 스트림을 위해서 thread pool 크기를 설정하는 것은 불가하다.
    • java.util.concurrent.ForkJoinPool.common.parallelism system property 값을 변경하여 thread pool 크기를 변경할 수 있지만 이는 모든 ForkJoinPool의 크기를 변경하는 것이므로 주의해서 설정해야 한다.
  • .iterate 와 같이 언바운드 스트림을 생성하는 연산 또는 .findFirst, .limit과 같이 순서에 의존하는 연산은 기본적으로 작업을 나눌 수 없기 때문(즉, 순차적)에 오히려 속도가 느려질 수 있다.
    • 반면 .findAny와 같은 연산은 요소의 순서와 관계없이 연산하므로 병렬로 수행하는 것이 이득일 수 있다.
    • 요소의 순서에 상관이 없다면 .unordered()를 호출하여 스트림을 비정렬된 상태로 만들고 여기에 .limit 과 같은 연산을 적용할 수 있다.
  • 데이터 소스가 random access가 가능해야 좀 더 효율적으로 분할이 가능하다 (e.g. ArrayList 👍 vs LinkedList 👎).
    • 물론 Spliterator를 직접 구현하여 이런 과정을 제어할 수 있다.
  • 중간 연산에 따라 스트림의 특성이 바뀔 수 있으며 이에 따라 분할 과정의 성능에 차이가 발생할 수 있다.
    • 필터 연산을 적용할 경우 결과 스트림의 크기가 예측 불가능하므로 효과적으로 분할할 수 없다.
  • 최종 연산의 병합 과정에서 발생하는 비용이 크다면 병렬화를 통해 얻는 이득이 크지 않을 수 있다.
  • 병렬 스트림의 중간 연산에서는 반드시 공유된 가변 상태를 피해야 한다.
  • 박싱, 언박싱 비용이 클 수 있으므로 필요하다면 기본형 특화 스트림을 사용한다(사실 병렬 스트림이 아니더라도 해당되는 이야기이다).

Spliterator

  • Spliterator는 Iterator와 유사하지만 데이터 소스를 분할할 수 있도록 추가 기능을 제공한다.
public interface Spliterator<T> { // Iterator의 next() + hasNext()의 역할을 한다. boolean tryAdvance(Consumer<? super T> action); // 자신의 일부 요소를 분할하여 두번째 Spliterator를 생성한다. Spliterator<T> trySplit(); // 정확하지는 않지만 탐색해야 될 요소의 숫자를 추정하여 제공한다. long estimateSize(); // Spliterator의 특성을 묘사한다. int characteristics(); };
  • 스트림의 분할은 .trySplit()의 결과가 null이 될 때 까지 재귀적으로 .trySplit()을 호출하여 이루어지며 .characteristics() 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
  • Spliterator의 특성은 다음과 같다.
    • ORDERED: 스트림의 요소에 정해진 순서가 있다.
    • DISTINCT: 스트림의 임의의 두 요소는 절대 같을수 없다.
    • SORTED: 스트림의 요소는 미리 정의된 정렬 순서를 따른다.
    • SIZED: 스트림의 크기가 정해져 있으므로 .estimatedSize()는 정확한 값을 반환한다.
    • NONNULL: 스트림의 모든 요소는 null이 아니다.
    • IMMUTABLE: 스트림의 데이터 소스는 불변이다. 따라서 요소를 탐색하는 동안 데이터 소스에 새로운 요소를 추가하거나 삭제하거나 교체할 수 없다.
    • CONCURRENT: 동기화 없이 데이터 소스를 여러 스레드에서 동시에 고칠 수 있다.
    • SUBSIZED: 분할된 모든 Spliterator가 SIZED 속성을 갖는다.
  • 작업을 분할할 때 임의의 위치에서 분할하면 안되는 경우에 직접 Spliterator를 작성할 필요가 있다.
    • 예: 문장에서 단어의 수 세기 → 임의의 위치에서 분할할 경우 단어 중간에서 분할하면 동일한 단어를 두 번 세는 경우가 생길 수 있다.