2K Views
May 26, 17
スライド概要
2017/05/11に開催された「Tokyo Airflow Meetup #1」での発表資料です。
ジョブ管理システムをApache OozieからApache Airflowに変えた際の事例紹介となります。
イベントURL
https://www.meetup.com/ja-JP/Tokyo-Apache-Airflow-incubating-Meetup/events/238731591/
2023年10月からSpeaker Deckに移行しました。最新情報はこちらをご覧ください。 https://speakerdeck.com/lycorptech_jp
OozieをやめてAirflowを導⼊してみた話 Tokyo Airflow Meetup #1 Yahoo! JAPAN D&S統括本部 データプラットフォーム本部開発2部 コマースインフラ 植草 智輝
アジェンダ ⾃⼰紹介 Airflowについて 事例紹介 Airflowを使って良かったこと ⼿が届かなかったところ まとめ
⾃⼰紹介 名前 : 植草 智輝 ⼊社 : 新卒⼊社2年⽬ twitter : @tmk_ueks github : tomueeen93 仕事 : Y!ショッピング広告関係のETL基盤構築 ジョブ管理システムをOozieからAirflowに 趣味 : ゲーム / ボルダリング / ハッカソン 3 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Airflowについて 4 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Airflowとは? • Airbnb製のWorkflow Engine • Apache(incubating)のOSSプロジェクト • Python製 • WebUIでタスクモニタリングできる • プラグインによる拡張が可能 5 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Workflow Engineとは? 複数の処理の依存関係を定義し、それらの実⾏を管理するもの 近年、OSSでも多数のWorkflow Engineが公開されている。 参考: Workflow Engines Meetup#1 6 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Airflowの特徴 • DAG(処理フロー)をコードで定義できる • 標準で様々なシステム連携がサポートされていて簡単に利⽤できる • Pythonによるプラグイン拡張ができる • 標準に含まれていない機能を補える • スケーラブル • 処理を実⾏するWorkerの分散が可能 7 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
事例紹介 8 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
プロダクト概要 • ヤフーショッピング広告関連のETL基盤の開発 • ストア向けの広告レポートの集計 • 社内向けにはアナリストなどへのレポートの集計 9 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
以前のETL基盤 サービスログ hadoopクラスタ Impalaクラスタ ユーザー load Job管理 Job管理 cron • ショッピング関連広告データのレポーティング 運⽤ 運⽤ • クラスタ別に2つのOozieの運⽤ • ワークフロー定義もそれぞれ別に開発 運⽤者 • サービスデータの連携などはcronで動かしていた Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌ 10
Oozieを利⽤していた時に感じていた課題 • XMLがしんどい • タスクの確認や再実⾏が⾯倒 • CLIでJob IDを確認して、再実⾏とかするのがしんどい • クラスタごとにOozieを管理しなければいけない • Hadoop外の連携が⾯倒 • GMTでの時間指定のみ • コミュニティがあまり活発ではない 11 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
現在のETL基盤 サービスログ Kylinサーバー (on HBaseクラスタ) hadoopクラスタ ユーザー load Job管理 Job管理 • 集計システムをImpalaからKylinに変更 Job管理 運⽤ • Hiveによる集計に変更 • ジョブ管理をOozieからAirflow1.7.1.3に変更 • プロジェクトのみでのパイロット導⼊ 運⽤者 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌ 12
システムの規模 • タスク数 約40個 • ⼊⼒ソースのチェックタスク • Hiveクエリ • Kylinのキューブビルド関係のタスク • その他マスターデータとの連携など • デイリーのデータ増加量 約1TB • デイリーの実⾏時間平均 約4時間 13 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Airflowを使ってよかったこと 14 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
WebUIで運⽤のしやすさが向上 依存関係を⼀覧で⾒れるので、原因究明が速い 👆 再集計時がWebUIから⼀括で出来る 15 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Tree Viewを利⽤して運⽤ • ⼀番使っている画⾯ • 過去タスクの⼀覧表⽰されている • 仕様変更時など再集計時などに使う • 過去分の⼀括クリアなどが出来て便利 • 表⽰する⽇付が増えると⾒づらい 16 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
クエリチューニングなどもしやすい • ガントチャートで実⾏時間が⾒れる • 時間のかかっているタスクが分かる • クエリチューニングをする時などに利⽤ • 不⾃然に時間がかかっていたら⾒直す • 縦線が無いので若⼲⾒づらい(最新版はある) 17 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
標準に⾜りない部分はPluginを作れる • クラスタのネットワークポリシー上MetastorePartitionSensorが利⽤不可 • 全てのHiveOperatorで共通したhiveconfを指定したい • quota使⽤量のメトリクスが取得したい • etc.. 独⾃環境依存の部分に対してはPluginで対応 18 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
プラグインの書き⽅と種類 • 基本的なクラスを継承したクラスを作成 • 代表的なもの • Operator : execute関数に書かれた処理を実⾏する • Sensor : poke関数の返り値がTrueになるまで処理を実⾏する • Hook : MySQLやHiveなど他のシステムを実⾏する際に利⽤する • AIRFLOW_HOME/plugins以下にpythonスクリプトを作成 • 作成したクラス • AirflowPluginを継承したクラスに作成したPluginを定義したもの Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌ 19
MetastorePartitionSensorの代替Plugin拡張
class MyHivePartitionSensor(HivePartitionSensor):
ui_color = '#83ccd2'
@apply_defaults
def __init__(self,
table,
partition="ds='{{ ds }}'",
hive_cli_conn_id='hiveserver_default',
schema='default',
poke_interval=60 * 3,
*args, **kwargs):
super(MyHivePartitionSensor, self).__init__(
table=table,
partition=partition,
metastore_conn_id=hive_cli_conn_id,
schema=schema,
poke_interval=poke_interval,
*args, **kwargs)
self.hive_cli_conn_id = hive_cli_conn_id
def poke(self, context):
継承したHookが呼び出される
if '.' in self.table:
self.schema, self.table = self.table.split('.')
logging.info(
'Poking for table {self.schema}.{self.table}, '
'partition {self.partition}'.format(**locals()))
if not hasattr(self, 'hook'):
self.hook = DspfecHiveCliHook(
hive_cli_conn_id=self.hive_cli_conn_id)
return self.hook.check_for_partition(
self.schema, self.table, self.partition)
HivePartitionSensorを継承したクラスを作成
←Sensor側
↓Hook側
SHOW PARTITIONSの出⼒を取得
class MyHiveCliHook(HiveClre.compile(riHook):
re_partition_ok = '^1 row selected', re.MULTILINE)
def check_for_partition(self, schema, table, partition):
hive_result = self.run_cli(
'SHOW PARTITIONS {schema}.{table} PARTITION({partition})'.format(
**locals()))
matches = self.re_partition_ok.findall(hive_result)
return len(matches) > 0
Beelineの出⼒から正規表現によってパーティション判定
パーティションが出来る(Trueになる)まで定期実⾏
Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
20
HiveOperatorでhiveconf共通化するPlugin拡張
Plugin側
class MyHiveOperator(HiveOperator):
@apply_defaults
def __init__(self,
hql,
hive_cli_conn_id='hive_cli_default',
schema='default',
hiveconf_jinja_translate=False,
script_begin_tag=None,
run_as_owner=False,
hiveconf=None,
*args, **kwargs):
super(MyHiveOperator, self).__init__(
hql=hql,
hive_cli_conn_id=hive_cli_conn_id,
schema=schema,
hiveconf_jinja_translate=hiveconf_jinja_translate,
script_begin_tag=script_begin_tag,
run_as_owner=run_as_owner,
*args, **kwargs)
self.hive_cli_params = ''.join(
[' --hiveconf {}={}'.format(k, v) for k, v in hiveconf.items()]
) if hiveconf else ''
def execute(self, context):
self.hook = self.get_hook()
# add own setting
self.hook.hive_cli_params += self.hive_cli_params
hiveconfs = {
'tez.queue.name': 'scheduled',
'mapred.job.queue.name': 'scheduled',
'hive.exec.scratchdir': '/user/owner/airflow_tmp',
'hive.tez.exec.print.summary': 'true',
'hive.stats.dbclass': 'fs',
'tez.am.view-acls': '*'
}
DAG側
DAGで共通のhiveconfを指定できるようにする
beeline実⾏時のオプションに上のhiveconfを連結
Hookに渡すhive_cli_paramsを上書きする
※本来はConnectionsのExtraに⼊⼒されているものが⼊る
logging.info('Executing: ' + self.hql)
self.hook.run_cli(hql=self.hql, schema=self.schema)
21
Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
開発⾯の効率アップ Oozie Airflow workflow修正 DAG修正 ここで構⽂チェック出来る 古いworkflow削除 直接修正できないストレス HDFSにPut oozie run(dryrun) airflow run ここでようやく構⽂チェック 22 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
⼿が届かなかったところ 23 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
再集計時の実⾏順が選べなかった • 再集計時などに古い⽇付から順に実⾏を進めたい • ⼀気にタスクをクリアするとschedulerが拾った順にqueueに⼊る • 結果的に順不同に並列実⾏される • 並列実⾏NGかつ、古い順に実⾏したい • 現状では再集計するスクリプト(or DAG)を作成して対応 再実⾏タスク 順番に実⾏したい 24 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
Pluginによるコード肥⼤化・複雑化の可能性 • なんでもPluginにしていると何してるかわからなくなる • ⼀般的なPythonのパッケージ管理⽅法でしっかり分割管理 • 通知機能などはpip packageに分割している • Dagはあくまで定義に留める、Pluginもあくまで機能拡張 • 良い管理⽅法あったら教えて下さい 25 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
まとめ 26 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌
まとめ • Good • WebUIがリッチになったことにより、運⽤のしやすさが激的にUP • 全てPythonベースで書けるので、読みやすくワークフロー定義の負担が少ない • Pluginを柔軟に作成できるので、外部のシステム連携がしやすい • Bad • 再集計時に⼀気に実⾏すると実⾏順が選べない • Pluginで⾊々やりすぎると何をしているかわからなくなるかもしれない 27 Copyright (C) 2017 Yahoo Japan Corporation. All rights reserved. 無断引⽤・無断転載禁⽌