データのワークフロー、スケジューリングができるCloud Composerの使い方についてご紹介します。
Cloud ComposerはApache Airflowのマネージドサービスで、インストールなどの環境構築をせずに利用することができます。
今回は、BigQueryでテーブル作成した後、CSVファイルをGoogle Cloud Storageへ格納するワークフローを実装します。
目次
実装手順
環境構築
コンソール画面からCloud Composerを開き、APIを有効にします。
環境の作成からComposer 1 を選択し、名前とロケーションのみ設定して作成を押します。
Composer 1 と 2 の違いは公式ドキュメントに詳細に記載されています。
15分〜20分ほどでクラスタが立ち上がり、Airflowウェブサーバー、DAGを管理するGoogle Cloud Storageにアクセスできるようになります。
DAGの作成
データの取り込みから出力までのワークフローをDAGで作成します。Pythonで記述し、一連のタスクの依存関係を定義します。
下記はBigQueryに同期してあるGAデータをSQLで整形して保存し、Google Cloud StorageにCSV出力するワークフローです。
Githubにもあげていますので、ご参照ください。
import datetime
import airflow
from airflow.models import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.models import Variable
from datetime import timedelta
import re
import json
import requests
gcppj = Variable.get('gcp-pj') # ①Variablesから値を取得
gcpds = Variable.get('gcp-ds')
gcs = Variable.get('gcs')
yesterday = format(datetime.date.today()-timedelta(days=1),'%Y%m%d')
webhook_url = Variable.get('slack_webhook_url')
env = Variable.get('env')
# ②ワークフロー失敗時のアラート通知
def failured(status):
dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
data = {
'username': 'composer-sample',
'channel': 'slack-channel',
'attachments': [{
'fallback': f'{dag_name}:{task_name}',
'color': '#e53935',
'title': f'{dag_name}:{task_name}',
'text': f'[{env}]{task_name} was failed...'
}]
}
requests.post(webhook_url, json.dumps(data))
default_args = {
'owner': 'sample-workflow',
'depends_on_past': True, # ③前のタスクが正常に完了したら次のタスクを実行
'retries': 1,
'retry_delay': datetime.timedelta(minutes=3),
'start_date': datetime.datetime(2021, 7, 9)
'on_failure_callback': failured
}
dag = DAG(
'sample-workflow',
schedule_interval='0 4 * * *', # ④UTC時間でスケジュールを設定
default_args=default_args,
catchup=False,
)
# テーブル作成
create_table = bigquery_operator.BigQueryOperator(
task_id='create_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
sql='/sql/sample.sql', # ⑤実行するSQLのファイルを指定
params={'gcppj':gcppj, 'gcpds':gcpds, 'yesterday':yesterday}, # ⑥渡すパラメーターの設定
destination_dataset_table='{}.{}.analytics_table'.format(gcppj, gcpds),
execution_timeout=timedelta(minutes=15), # ⑦実行制限時間
dag=dag
)
# gcs出力
export_csv = BigQueryToCloudStorageOperator(
task_id='export_csv',
source_project_dataset_table='{}.{}.analytics_table'.format(gcppj, gcpds),
destination_cloud_storage_uris='gs://{}/analytics_{}.csv'.format(gcs, yesterday),
compression=None,
export_format='CSV',
field_delimiter=',',
print_header=True,
bigquery_conn_id='bigquery_default',
delegate_to=None,
labels=None,
execution_timeout=timedelta(minutes=10),
dag=dag
)
# ⑧依存関係定義
create_table >> export_csv
①Variablesから値を取得
Airflowから環境変数を設定し、その値を取り出すことができます。jsonファイルでのインポートも可能で、Variables.jsonをインポートすると以下のようにセットされます。
AirflowのウェブサーバはComposerのコンソールから遷移できます。
②ワークフロー失敗時のアラート通知
ワークフロー失敗時にメールやslack通知が可能です。今回はslack通知の設定をしています。
③前のタスクが正常に完了したら次のタスクを実行
この値をTrueにするとで設定できます。
④UTC時間でスケジュールを設定
実行時間を指定できます。注意点としてAirflowはUTC時間なので、実行したい日本時間から-9時間引いて設定します。
⑤実行するSQLのファイルを指定
Dagsから相対パスで指定することが可能です。
⑥渡すパラメーターの設定
ここではSQL内で利用するパラメータを渡しています。
⑦実行制限時間
タスクの制限時間を指定します。ここの設定がないと、永遠にタスクが実行中となる場合があります。
⑧依存関係定義
依存関係の定義をします。
Aを実行→B,Cを実行→Dを実行という場合であれば、A >> [b,c] >> dというように定義します。
手動デプロイと実行
DAGフォルダを格納するGoogle Cloud Storageへファイルを置くと、Airflowへ反映されます。
ComposerのコンソールからDAGフォルダへ遷移し、ファイル・フォルダのアップロードをします。
Airflowウェブサーバへアクセスすると、先ほどアップロードしたsample-workflow DAGが反映されています。
成功すると、BigQueryにテーブルが作成され、Google Cloud StorageにCSVファイルが出力されています。
Cloud Source RepositoriesとCloud BuildによるCI/CD
Cloud Source RepositoriesとCloud Buildを利用することで、ビルド・テスト・デプロイを自動化することができます。
今回はmasterにpushされたら自動でGoogle Cloud Storageに反映するように設定してみます。
GithubとCloud Source Repositoriesを連携
まずはGithubとCloud Source Repositoriesを連携します。
コンソールからCloud Source Repositoriesへ遷移し、開始からリポジトリの作成へと進みます。APIの有効化画面に遷移した場合は、有効にしてください。
Cloud Source Repositoriesに何かしらのリポジトリがないと、外部リポジトリを接続した際に”リポジトリに接続できませんでした”というエラーが発生します。
そのため、まずはリポジトリの追加で”新しいリポジトリの作成”を選択して、空のリポジトリを作ります。
すべてのリポジトリへ移り、リポジトリの追加で今度は”外部リポジトリの接続”を選択します。
GCPプロジェクトとGitプロバイダを選択して、認証情報にチェックします。
GitHubとGCPの連携認証を求められるので、GitHubにログインして画面の表示通りに進めます。認証が済むとリポジトリ一覧が表示されるので、対象のリポジトリを選択して接続します。
Cloud Buildの設定
Cloud Buildの画面へ遷移し、まずはCloud BuildからGoogle Cloud Storage、Cloud Composerへのアクセス権限を付与します。設定のサービスアカウントをコピーし、IAMと管理でストレージ管理者・Composer管理者を付与します。
Cloud Build画面へ戻り、”ビルドトリガーを設定”で①〜④の部分を設定し、トリガーを作成します。
①プロジェクトで一意の名前をつけます。
②デプロイするイベントを選択します。今回は”ブランチにpushする”を選択します。
③ソースを選択します。Cloud Source Repositoriesで作成したリポジトリを選択し、ブランチは^master$にします。
④masterにpushされたときに動かす設定ファイルを選択します。”Cloud Build構成ファイル(yaml または json)”を選択し、Locationはリポジトリにします。
続いてcloudbuild.yamlファイルを作成します。dags配下をGoogle Cloud Storageへコピーする処理をyamlファイルに記述します。
steps:
- name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args:
- '-exc'
- |
gsutil cp -r dags/* gs://xxx/dags
gs://xxx/dags部分は、DAGSフォルダを格納するGoogle Cloud StorageのURIへ変更します。
追加したyamlファイルをmasterへpushするとCloud Buildが実行され、成功するとソースコードがGoogle Clouod Storageへ反映されます。
コスト
Cloud Composerの料金は、環境スペックと実行時間のほか、実行環境でCompute Engine、データの保存でGoogle Cloud Storageのコストがかかります。
試算ツールで計算すると、最小構成であるノード数3つの場合1ヶ月に約43,000円ほどかかります。
まとめ
Cloud Composerでワークフローを構築する方法をご紹介しました。
オンプレからクラウドまでの接続コネクタがあるので、取り込みから出力まで一連のワークフローの実装が可能です。
BigQuery内で完結できるワークフローにはDataformを利用し、BigQuery以外も利用する場合はCloud Composerという使い分けができるように思いました。
Dataformの使い方は以前の記事をご参照ください。この記事が参考になれば幸いです!