635 Views
July 22, 19
スライド概要
本セッションでは、Apache IgniteとApache Sparkの統合やHTAP処理におけるその活用について紹介します。
2023年10月からSpeaker Deckに移行しました。最新情報はこちらをご覧ください。 https://speakerdeck.com/lycorptech_jp
Data Engineering and Data Analysis Workshop Apache IgniteとApache Sparkの統合によるHTAP活用 Roman Shtykh ヤフー株式会社
自己紹介 • エンジニア @ ヤフー • PMC & Committer @ Apache Ignite • Committer @ Apache RocketMQ, MyBatis • twitter: @rshtykh
Apache Spark • 分散処理(コンピュート)エンジン • 従来のBI処理だけではなく高度なアナリティクス(MLなど)も可能とする • コンピュート処理のエラーハンドリング、マシーン間のデータ移動(shuffling) • 巨大なデータの処理にリレーショナルやプロシージャ型APIを提供し、クエリー処理やデータ分析でmixさせることも可能とする • DataFrame (DataSet) 抽象的なデータ構造のパワー
What happens when not enough memory? • 大量なデータ + reduceByKey, groupByKey, join処理によるshuffling • 1 executorあたりのメモリリミットによりtemporary file生成 • Spill files • Shuffle files • Cache files http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/
他にも • 長期間保存メカニズムはないため、外部ストレージが必要 • データロード • データオフロード • 複数フェーズに分けた処理において、ディスク読み書きでパフォーマンス低下
Disaggregated Storage and Compute Architecture (1) • Collocated cluster • 各クラスターノードはコンピュート+ストレージの役割を担う • Disaggregated cluster • コンピュート+ストレージは別々のクラスターで 参考: https://databricks.com/session/taking-advantage-of-a-disaggregated-storage-and-compute-architecture
Disaggregated Storage and Compute Architecture (2) • Why collocated cluster? • Data gravity (データ重力) • Data & compute collocation Speed of light problem Principles of data gravity 参考: https://cloudarchitectmusings.com/2017/01/30/the-aws-lovehate-relationship-with-data-gravity/
Disaggregated Storage and Compute Architecture (3) • Why disaggregated? 1. 処理に合わせたハードウェア選定が可能となる 1. 処理の最適化 2. コストの最適化 2. リソース削減 1. コストDOWN 2. より簡単・明確なデータ管理 3. より簡単なクラスター管理 3. SLAに合わせたクラスターリソースのmixが可能となる 4. ソフトウェア管理の柔軟性が上がる We choose disaggregation, but at performance regression costs
Disaggregated Storage and Compute Architecture (4) • パフォーマンス低下 • ディスクwriteに • ネットワークレイテンシー • 様々な故障(新たなPOF) • 非効率なネットワークの利用 How to avoid performance regression keeping advantages of disaggregation?
In-memory Acceleration Layer Yuan Zhou, Haodong Tang, Jian Zhang, “Spark-PMoF: Accelerating big data analytics with Persistent Memory over Fabric”, Strata Data Conference 2019 https://conferences.oreilly.com/strata/strata-ca-2019/public/schedule/detail/72992
Example: Shuffle Disaggregation with Crail (1) https://crail.incubator.apache.org/blog/2019/03/disaggregation.html
Example: Shuffle Disaggregation with Crail (2) • Disaggregated Spark Map-Reduce (Sorting) • Spark job sorting 200G of data on a 8 node cluster • Crail on a 4 node storage cluster connected to the compute cluster over a 100 Gb/s RoCE network https://crail.incubator.apache.org/blog/2019/03/disaggregation.html
Apache Igniteの場合?
Apache Ignite • メモリを中心に据えた分散 • データベース • キャッシュ • データ処理(コンピュート) プラットフォーム https://apacheignite.readme.io
Apache Ignite • 高性能分散インメモリープラットフォーム • スケーラビリティ・高パフォーマンス・耐障害性 • データは複数マシーンに分散される • 動的なノード(リソース)追加・削除 • リニアなスケーラビリティ • 冗長化(耐障害性) • SPOFはない • 低レイテンシー • 並列処理(高パフォーマンス) • データ局所性尊重
Apache Ignite/Apache Spark比較 Apache Ignite • 分散メモリ中心DB • SQL, transactions, key-value, collocated processing, ML • OLAP and OLTP Apache Spark • HDFSのような外部ストレージからデータ取得 • ストリーミング・コンピュートエンジン • OLAPよりの処理 • MR payloads中心
Apache Ignite + Apache Spark IgniteをSparkのストレージとして用いる • Sparkワーカー間の迅速かつ容易なステートシェアリング • データ移動の最小化やインデックス利用でSpark SQLのパフォーマンス向上 • OLTP、OLAP、機械学習はオーバヘッドが少なく https://spark.apache.org/docs/latest/cluster-overview.html
Apache Ignite + Apache Spark
Working with RDDs • RDDインターフェース • 容易なステートシェアリング • RDD ← Igniteパーティションマッピング // Ignite Shared RDD val sharedRDD: IgniteRDD[String, MyType] = igniteContext.fromCache[String, MyType]("myCache") // some transformations sharedRDD.map(x => ...).filter(...).map(...)
Working with tables (1) • Ignite SQLテーブルを読む • DataFrame → Ignite SQLテーブルに書き込む // read data val userDF = spark.read.format(IgniteDataFrameSettings.FORMAT_IGNITE) //Data source type. .option(IgniteDataFrameSettings.OPTION_TABLE, "USER") //Table to read. .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, "path_to_ignite_config") //Ignite config. .load() • SparkSessionの拡張であるIgniteSparkSessionのIgniteExternalCatalogを利用することによりIgniteテーブルの明示的な登録が不要 // This will print out info about all SQL tables existed in Ignite. igniteSession.catalog().listTables().show();
Working with tables (2) SQL Optimization • CatalystクエリーオプティマイザにIgniteOptimizationを • データ移動を最低限に • →より高速なクエリー実行プラン https://habr.com/ru/company/sberbank/blog/427297/
Apache Ignite SQL 分散 (co-located) JOIN 1. リクエスト 2. ローカルデータに対する処理 3. その結果のマージ https://ignite.apache.org
Apache Ignite + Apache SparkによるHTAP • ストリーム処理やOLAPクエリーはSparkで • 機械学習はどちらでも • Igniteをストレージに用いてワーカーのステートシェアリングにより高速な処理 • Ignite SQLインデックスサポートにより高速なクエリー • Spark APIはほぼそのまま利用可能
Use case 1 (1) • Telcomユースケース • 通話質のリアルタイム改善 • データ量: 数Gb/s https://mapr.com/blog/performance-tuning-kafka-spark-streaming-telecom/
Use case 1 (2) https://mapr.com/blog/performance-tuning-kafka-spark-streaming-telecom/
Use case 2 • リアルタイム株価の予測 https://www.imcsummit.org/2018/eu/session/when-one-minute-can-cost-you-million-predicting-share-prices-real-time-apache-spark-and
Q&A
IN-MEMORY COMPUTING MEETUP TOKYO