>100 Views
June 10, 19
スライド概要
2019年4月末に行われたSpark+AI Summit 2019の参加報告会の資料です。
2023年10月からSpeaker Deckに移行しました。最新情報はこちらをご覧ください。 https://speakerdeck.com/lycorptech_jp
Spark+AI Summit 2019 Apache Spark 2.4から Nested Columnsの10x性能改善 2019年6月10日 Poon Yat Sing (@oneonestar) Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved.
自己紹介 名前 : Poon Yat Sing - スター (Twitter: @oneonestar) 出身 : 香港 所属 : Yahoo! JAPAN (2017年10月入社) 仕事 : クエリエンジンの開発、チューニングと運用 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 2
アジェンダ • “Making Nested Columns as First Citizen in Apache Spark SQL”の発表共有 • Column Pruningの仕組み https://databricks.com/session/making-nested-columns-as-first-citizen-in-apache- spark-sql Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 3
発表内容を共有 Apple Siriチーム • データをNested構造に保存している • 5つのTopカラムに2千のNestedフィールドがある • クエリが一部のカラムだけを読む Spark 2.3 => Spark 2.4 • 1.2時間かかったクエリが3.3分間で終わる • 読むデータ量が7.1TBから840GBになる * Apple and Siri are trademarks of Apple Inc., registered in the U.S. and other countries # All the above information are quoted from “Making Nested Columns as First Citizen in Apache Spark SQL” by DB Tsai and C Delgado 4 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved.
Nested column pruning問題の歴史 Spark / SPARK-4502 Spark SQL reads unneccesary nested fields from Parquet Details Type: Improvement Status: RESOLVED Priority: Critical Resolution: Fixed Affects Version/s: 1.1.0 Fix Version/s: 2.4.0 Component/s: SQL Labels: None Target Version/s: 2.4.0 People Assignee: Michael Allman Reporter: Liwen Sun Votes: 47 Vote for this issue Watchers: 75 Start watching this issue Dates Created: 20/Nov/14 00:53 Updated: 10/Dec/18 23:57 Resolved: 24/Aug/18 04:33 Description When reading a field of a nested column from Parquet, SparkSQL reads and assemble all the fields of that nested column. This is unnecessary, as Parquet supports fine- grained field reads out of a nested column. This may degrades the performance significantly when a nested column has many fields. For example, I loaded json tweets data into SparkSQL and ran the following query: SELECT User.contributors_enabled from Tweets; SPARK-4502 、 HIVE-15055 、 Presto-2508 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 5
Nested column pruningの発展歴史 2010 Google Dremel論文発表 2013 Parquetファイルフォーマット発表 2014 SPARK-4502課題発見 (Spark 1.1.0) 2014+ Apple, Uber, Yahoo! JAPANなどの会社が 内製パッチを運用# 2018 SPARK-4502 => RESOLVED #GithubのPRとコメントや公開発表から推測したこと Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 6
Nested column pruningの発展歴史 Issue: SPARK-4502 2016 PR-14957 2017 PR-16578 2018 PR-21320 > Merged!! Conversation 67 Commits 10 Conversation 243 Commits 25 Conversation 208 Commits 24 2年以上レビューの対応と見直しをしてくれた、Michael(@mallman)さん、 本当にありがたい。 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 7
行指向と列指向フォーマット CREATE TABLE foo( A int, B int, C int ) A B C A1 B1 C1 A2 B2 C2 A3 B3 C3 行指向フォーマット: (CSV, Avro) A1 B1 C1 A2 B2 C2 A3 B3 C3 列指向フォーマット: (Parquet, ORC) A1 A2 A3 B1 B2 B3 C1 C2 C3 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 8
行指向フォーマット ユーザー SELECT B FROM Table; Computeエンジン (Spark, Hive, Presto) エンジン Storageに保存するファイル (S3, HDFS) A1 B1 C1 A2 B2 C2 A3 B3 C3 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 9
列指向フォーマット ユーザー SELECT B FROM Table; Computeエンジン (Spark, Hive, Presto) エンジン Storageに保存するファイル (S3, HDFS) A1 A2 A3 B1 B2 B3 C1 C2 C3 BのOffsetから読む Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 10
列指向フォーマットのメリット ユーザー SELECT B FROM Table; Computeエンジン (Spark, Hive, Presto) エンジン エンジン Storageに保存するファイル (S3, HDFS) A1 B1 C1 A2 B2 C2 A3 B3 C3 行指向フォーマット A1 A2 A3 B1 B2 B3 C1 C2 C3 列指向フォーマット Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 11
列指向フォーマットのメリット ユーザー SELECT B FROM Table; Computeエンジン (Spark, Hive, Presto) エンジン エンジン Column Pruning Storageに保存するファイル (S3, HDFS) A1 B1 C1 A2 B2 C2 A3 B3 C3 行指向フォーマット A1 A2 A3 B1 B2 B3 C1 C2 C3 列指向フォーマット Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 12
列指向フォーマットのメリット • OLAPとOLTPの重要な違いの1つ • 10x~100x性能の違い • ネットワーク、CPU、メモリの利用が少なくなる Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 13
Nested Column Pruningとは Parquet CREATE TABLE nested( A int, B struct<B1:string,B2:int>, C int ) A B C B1 B2 A1 B1_1 B2_1 C1 A2 B1_2 B2_2 C2 A3 B1_3 B2_3 C3 A1 A2 A3 B1_1 B1_2 B1_3 B2_1 B2_2 B2_3 C1 C2 C3 Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 14
Nested Column Pruningとは ユーザー SELECT B.B1 FROM Table; CREATE TABLE nested( A int, B struct<B1:string,B2:int>, C int ) エンジン エンジン A1 A3 B1_1 B1_2 B1_3 B2_1 B2_2 B2_3 C1 C3 A1 A3 B1_1 B1_2 B1_3 B2_1 B2_2 B2_3 C1 C3 Nested Column Pruningなし Nested Column Pruningあり Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 15
Nested Column Pruningの重要性 CREATE TABLE nested( A struct<A1:string,A2:int,A3:string,.....>, Sub-columnが多い (10~1000 カラム ) B struct< B1:struct<B1_1:string, B1_2 struct<......> > > ) 深いネストデータ (~十数層) Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 16
Nested Column Pruningの重要性 • テーブルのスキーマとクエリによって、2x~100X の改善がある CPU time spent Before After 0 500 1000 1500 2000 2500 3000 CPU Time (minutes) Others Project Read+Unzip Unnest #Nested Column Pruning Benchmark using Presto in Yahoo! JAPAN Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 17
Pruningの例
struct<a:bigint,b:int>
struct<c:struct<a:bigint,b:int>>
array<struct<a:bigint,b:int>>
map<bigint,struct<a:bigint,b:int>> (Spark-27241)
SELECT($"s.a") / WHERE($"s.a"===1)
Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 18
使い方 対象 : Nestedスキーマを使っている方 (Struct, Array) Sparkバージョン : 2.4以上 (3.0以上がおすすめ) ファイルフォーマット : Parquet (ORCは3.0以降のみ) “spark.sql.optimizer.nestedSchemaPruning.enabled” = true Known-Issue: SPARK-25407, SPARK-26975 (3.0以降解決された) Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 19
使い方
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.range(10)
.select($"id", struct(($"id" + $"id") as "c", ($"id" * $"id") as "d") as "a")
.write
.format("parquet")
.mode("overwrite")
.saveAsTable("nestedTab1")
// Exiting paste mode, now interpreting.
scala> spark.table("nestedTab1").select($"a.c").explain
== Physical Plan ==
*(1) Project [a#14.c AS c#20L]
+- *(1) FileScan parquet default.nestedtab1[a#14] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://
1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:struct<c:bigint,d:bigint>>
scala> spark.conf.set("spark.sql.optimizer.nestedSchemaPruning.enabled", true)
scala> spark.table("nestedTab1").select($"a.c").explain
== Physical Plan ==
*(1) Project [a#14.c AS c#26L]
+- *(1) FileScan parquet default.nestedtab1[a#14] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://
1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:struct<c:bigint>>
Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 20
Appendix Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved.
実装する方法 Optimizer ルール : • ColumnPruning => NestedColumnAliasingの機能が追加された • SchemaPruning => DataSchemaをPruneして、 ParquetやORC Readerに渡す Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 22
NestedColumnAliasing
• TopカラムがAttributeで表される
• Column PruningのロジックはAttributeだけ対応される
• Limit/Sample/Repartitionがあるクエリも使えるように、
NestedカラムのAliasを作る
Before:
scala> spark.table("nestedTab1").repartition(1).select($"a.c").explain
== Physical Plan ==
*(2) Project [a#1.c AS c#13L]
+- Exchange RoundRobinPartitioning(1)
+- *(1) FileScan parquet default.nestedtab1[a#1] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://
ab1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:struct<c:bigint,d:bigint>>
After:
scala> spark.table("nestedTab1").repartition(1).select($"a.c").explain
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
- *(1) Project [a#1.c AS c#20L]
+- *(1) FileScan parquet default.nestedtab1[a#1] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://
warehouse/nestedtab1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:struct<c:bigint>>
Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 23
SchemaPruning • クエリOptimizerルールの1つ • PhysicalOperationを書き換えるため • 対応できるファイルフォーマットを判断(Parquet, ORCv1, ORCv2) • ProjectとFilterが使っていないStructFieldをPrune • RelationのDataSchemaを書き換える Copyright (C) 2019 Yahoo Japan Corporation. All Rights Reserved. 24