Stream Gatherers のツボを抑えよう!

4.6K Views

June 06, 25

スライド概要

JJUG 2025 Spring 登壇資料

profile-image

虎の穴ラボでエンジニアをしています

シェア

またはPlayer版

埋め込む »CMSなどでJSが使えない場合

ダウンロード

関連スライド

各ページのテキスト
1.

TORANOANA Lab Stream Gatherers の ツボを抑えよう! JJUG CCC 2025 Spring Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved. 2025/6/7 Shun Asami

2.

Shun Asami 虎の穴ラボ株式会社 エンジニア Java / Kotlin / TypeScript X @kawaiiseeker 最近業務で Ruby を書いています。型が恋しい... 今回JJUG初登壇!よろしくお願いします! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

3.

地球上の誰かがふと思った... Stream API でこんなコードが 書けたらいいな... Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

4.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

5.

List<Response> response = Stream で receivePackets() パケットを受け取り .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

6.

List<Response> response = receivePackets() filter で .filter(Packet::isWellFormed) 正しい形式のパケットに限定 .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

7.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) 一定バッファで .groupUsing(Packet::isFirst) パケットを順番通りに並べ替え .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

8.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) 隣接する重複を削除 .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

9.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) フラグを確認して .groupUsing(Packet::isFirst) グループ化 .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

10.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) リクエストオブジェクトに変換 .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

11.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call バックエンドサービスに ) リクエスト投げる .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

12.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) 結果をリストにして返却 .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

13.

こんなメソッドがあったらいいな - reorder(Comparator<? super T> comparator) - コンパレータの通りに並べ替え - sorted と違って無限ストリームにも対応 - deduplicateConsecutive(Function<? super T, ?> keyExtractor) - キーごとに隣接する重複を排除 - groupUsing - mapConcurrent Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

14.

他にも Stream API にこんなメソッドがあったらいいな - distinctBy(Function<? super T, ?> keyExtractor) - キーごとに重複を削除してくれる - distinct と違って無限ストリームに対応 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

15.

他にも Stream API にこんなメソッドがあったらいいな - distinctBy(Function<? super T, ?> keyExtractor) - キーごとに重複を削除してくれる - distinct と違って無限ストリームに対応 => あったらいいな~という中間操作はまだまだありそう... Java のバージョンアップごとにどんどん入れていく? どれを標準に採用してどれを見送るか判断が大変... Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

16.

どんな中間操作も 独自で定義できるようになったらいいな〜 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

17.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .reorder(Comparator.comparinLong(Packet::id)) .deduplicateConsecutive(Packet::id) .groupUsing(Packet::isFirst) .map(Result::fromPackets) .mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call ) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

18.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .???(reorder(Comparator.comparinLong(Packet::id)) .???(deduplicateConsecutive(Packet::id)) .???(groupUsing(Packet::isFirst)) .map(Result::fromPackets) .???(mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call )) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

19.

List<Response> response = receivePackets() .filter(Packet::isWellFormed) .gather(reorder(Comparator.comparinLong(Packet::id)) .gather(deduplicateConsecutive(Packet::id)) .gather(groupUsing(Packet::isFirst)) .map(Result::fromPackets) .gather(mapConcurrent( BackendService.maxConcurrentRequests(), BackendService::call )) .toList(); https://www.youtube.com/watch?v=8fMFa6OqlY8&t=2466s&ab_channel=Devoxx Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

20.

ということで Java 24 から Stream Gatherers が導入! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

21.

Stream Gatherers (ギャザラー) - Stream に gather メソッドが追加 - Gatherer オブジェクトを渡して使う ①標準で用意された Gatherer を使う windowFixed / windowSliding / fold / scan / mapConcurrent ②独自の Gatherer オブジェクトを用意する => 自分で定義すればどんな中間操作でも書ける! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

22.

どんな中間操作でも書けるとは どういうこと? Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

23.

いままでの中間操作で できたことおさらい Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

24.

Stream の中間操作は何ができる? - インクリメンタルな処理 … 上流から要素を受け取る毎に実行 (map) Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

25.

Stream の中間操作は何ができる? - インクリメンタルな処理 … 上流から要素を受け取る毎に実行 (map) - フィニッシャー処理 … 要素を全部受け取った後に実行 (sorted / distinct) Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

26.

Stream の中間操作は何ができる? - インクリメンタルな処理 … 上流から要素を受け取る毎に実行 (map) - フィニッシャー処理 … 要素を全部受け取った後に実行 (sorted / distinct) - 短絡(short-circuit) … 途中で処理を中断して、その後の処理は省略 (limit / takeWhile) Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

27.

Stream の中間操作は何ができる? - インクリメンタルな処理 … 上流から要素を受け取る毎に実行 (map) - フィニッシャー処理 … 要素を全部受け取った後に実行 (sorted / distinct) - 短絡(short-circuit) … 途中で処理を中断して、その後の処理は省略 (limit / takeWhile) - 要素数の変更 … 1:N、N:1、N:Mに自由にストリームの要素数を変更 (filter / flatMap / mapMulti) Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

28.

Stream の中間操作は何ができる? - インクリメンタルな処理 … 上流から要素を受け取る毎に実行 (map) - フィニッシャー処理 … 要素を全部受け取った後に実行 (sorted / distinct) - 短絡(short-circuit) … 途中で処理を中断して、その後の処理は省略 (limit / takeWhile) - 要素数の変更 … 1:N、N:1、N:Mに自由にストリームの要素数を変更 (filter / flatMap / mapMulti) - ステートフルな処理 … 内部的に状態を保持してインクリメンタル処理 (limit) Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

29.

Stream の中間操作は何ができる? - 各要素ごとの インクリメンタルな処理 が書ける - 要素は1:1だけでなく1:N、N:1、N:Mに自由に要素数の変更 ができる - 状態を持った処理 が書ける - 途中で 短絡(short-circuit) できる - 最後に フィニッシャー処理 が書ける 中間操作はこんなにポテンシャルがあるが、 今までは標準で用意されていたメソッドの範囲でしか使えなかった... Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

30.

Stream の中間操作は何ができる? - 各要素ごとの インクリメンタルな処理 が書ける - 要素は1:1だけでなく1:N、N:1、N:Mに自由に要素数の変更 ができる - 状態を持った処理 が書ける - 途中で 短絡(short-circuit) できる - 最後に フィニッシャー処理 が書ける 中間操作はこんなにポテンシャルがあるが、 今までは標準で用意されていたメソッドの範囲でしか使えなかった... Gatherer でこれらの操作を全て自由に組み合わせて 独自の処理を実装できるようになりました!!! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

31.

終端操作との違いは? 終端操作の Collector はこれまでも拡張できた しかし終端操作ゆえに無限ストリームには対応できなかった Gatherer を使えば無限ストリームに対しても 独自処理をあてることができる Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

32.

Gathererで書けるようになったコード例 ピタゴラス数を列挙するコード 1から数字の無限ストリームを生成 長さ3の組み合わせを独自ギャザラーで生成 => 3つの数字の組み合わせを小さい順に生成 最初に見つかった5個を 標準出力に書き出す a² + b² = c² 原始ピタゴラス数となる 組み合わせを filter Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

33.

独自の Gatherer の実装方法 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

34.

独自の Gatherer の実装方法 - Gatherer オブジェクトを作って gather メソッドに渡して使う - Gatherer オブジェクトはファクトリメソッドから作成する - Gatherer.of … 並列化に対応する場合 - Gatherer.ofSequential … 並列化に対応しない場合 基本はこちらを使えばOK Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

35.

独自の Gatherer の実装方法 - Gatherer のファクトリメソッドの引数 - initializer … ステートフルな処理を書く場合の 状態を保持するクラスの Supplier - integrator … インクリメンタルな処理 - finisher … フィニッシャー処理 - combiner … Gatherer.of 側にだけある並列化対応用の結合関数 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

36.

integrator とは - 要素を受け取った時の インクリメンタルな処理 を実装する関数 ラムダで書く場合 型を明示的に書きたい場合は Integrator.of で囲む Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

37.

integrator とは - 上流から要素が渡ってきた時の インクリメンタルな処理 の実装を書く - 引数は以下の3つ - state … ステートフルな処理のときに使う状態オブジェクト - element … 渡ってきた要素 - downstream … 下流の参照 - downstream.push(newElement) で下流に要素を渡せる => 呼び出す回数次第で自由に 要素数の変更ができる - 戻り値が true なら処理続行、false なら短絡する - downstream.isRejecting() で下流が短絡しているか判定できる - 短絡しない場合は Integrator.ofGreedy を使うと最適化される Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

38.

ややこしいので 例を見てよう! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

39.

windowFixed を例に見てみよう! - 指定されたサイズに区切ってリストにする - 最後長さが満たない余りもリストが作られる Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

40.

windowFixed を例に見てみよう! Gatherer windowFixed(3) state=[] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

41.

windowFixed を例に見てみよう! 上流 1 Gatherer windowFixed(3) state=[1] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

42.

windowFixed を例に見てみよう! 上流 1 2 Gatherer windowFixed(3) state=[1, 2] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

43.

windowFixed を例に見てみよう! 上流 1 2 3 Gatherer windowFixed(3) state=[1, 2, 3] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

44.

windowFixed を例に見てみよう! 上流 1 2 3 Gatherer windowFixed(3) 下流 state=[] [1, 2, 3] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

45.

windowFixed を例に見てみよう! 上流 1 2 3 4 Gatherer windowFixed(3) 下流 state=[4] [1, 2, 3] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

46.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 Gatherer windowFixed(3) 下流 state=[4, 5] [1, 2, 3] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

47.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 6 Gatherer windowFixed(3) 下流 state=[4, 5, 6] [1, 2, 3] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

48.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 6 Gatherer windowFixed(3) 下流 state=[] [1, 2, 3] [4, 5, 6] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

49.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 6 7 Gatherer windowFixed(3) 下流 state=[7] [1, 2, 3] [4, 5, 6] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

50.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 6 7 Gatherer windowFixed(3) 下流 終了 state=[7] [1, 2, 3] [4, 5, 6] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

51.

windowFixed を例に見てみよう! 上流 1 2 3 4 5 6 7 Gatherer windowFixed(3) 下流 終了 state=[] [1, 2, 3] [4, 5, 6] Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved. [7]

52.

ユーティリティクラスに windowFixed メソッドを作成 T を List<T> にするので Gatherer<T, ?, List<T>> Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

53.

ファクトリメソッド Gatherer.ofSequential を使用 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

54.

第一引数 initializer state に使うリストを用意 Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

55.

第二引数 integrator インクリメンタルな処理をここに書いていく Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

56.

windowFixed は短絡しないので Integrator.ofGreedy を使うと良い 最後は return true; Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

57.

ただし下流が短絡した場合は 伝播させる Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

58.

要素をまず state に格納 state の長さが一定になったら 下流にリストを push する Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

59.

第3引数 finisher フィニッシャー処理を書く Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

60.

state に残りがあったら それをリストとして下流に push する Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

61.

完成! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.

62.

まとめ Stream Gatherers の導入で - 中間操作でできることを独自実装で組み合わせて使えるようになった インクリメンタル / フィニッシャー / 短絡 / 要素数の変換 / ステートフル処理 - 無限ストリームに対しても独自処理がかけるようになった! 実装方法は時間がなく駆け足でしたが... 以前ブログに同じ内容を書いているので ぜひご覧ください! ご清聴ありがとうございました! Copyright (C) 2025 Toranoana Lab Inc. All Rights Reserved.