Cloud Composerでワークフローを構築する方法〜BigQueryからGoogle Cloud StorageにCSVを出力する〜

データのワークフロー、スケジューリングができるCloud Composerの使い方についてご紹介します。

Cloud ComposerはApache Airflowのマネージドサービスで、インストールなどの環境構築をせずに利用することができます。

今回は、BigQueryでテーブル作成した後、CSVファイルをGoogle Cloud Storageへ格納するワークフローを実装します。

実装手順

環境構築

コンソール画面からCloud Composerを開き、APIを有効にします。

環境の作成からComposer 1 を選択し、名前とロケーションのみ設定して作成を押します。

Composer 1 と 2 の違いは公式ドキュメントに詳細に記載されています。

15分〜20分ほどでクラスタが立ち上がり、Airflowウェブサーバー、DAGを管理するGoogle Cloud Storageにアクセスできるようになります。

DAGの作成

データの取り込みから出力までのワークフローをDAGで作成します。Pythonで記述し、一連のタスクの依存関係を定義します。

下記はBigQueryに同期してあるGAデータをSQLで整形して保存し、Google Cloud StorageにCSV出力するワークフローです。

Githubにもあげていますので、ご参照ください。

import datetime

import airflow
from airflow.models import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.models import Variable
from datetime import timedelta
import re
import json
import requests

gcppj = Variable.get('gcp-pj') # ①Variablesから値を取得
gcpds = Variable.get('gcp-ds')
gcs = Variable.get('gcs')
yesterday = format(datetime.date.today()-timedelta(days=1),'%Y%m%d')
webhook_url = Variable.get('slack_webhook_url')
env = Variable.get('env')

# ②ワークフロー失敗時のアラート通知
def failured(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
    data = {
            'username': 'composer-sample',
            'channel': 'slack-channel',
            'attachments': [{
                'fallback': f'{dag_name}:{task_name}',
                'color': '#e53935',
                'title': f'{dag_name}:{task_name}',
                'text': f'[{env}]{task_name} was failed...'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

default_args = {
    'owner': 'sample-workflow',
    'depends_on_past': True, # ③前のタスクが正常に完了したら次のタスクを実行
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=3),
    'start_date': datetime.datetime(2021, 7, 9)
    'on_failure_callback': failured
}
dag = DAG(
    'sample-workflow',
    schedule_interval='0 4 * * *', # ④UTC時間でスケジュールを設定
    default_args=default_args,
    catchup=False,
)

# テーブル作成
create_table = bigquery_operator.BigQueryOperator(
    task_id='create_table',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    sql='/sql/sample.sql', # ⑤実行するSQLのファイルを指定
    params={'gcppj':gcppj, 'gcpds':gcpds, 'yesterday':yesterday}, # ⑥渡すパラメーターの設定
    destination_dataset_table='{}.{}.analytics_table'.format(gcppj, gcpds),
    execution_timeout=timedelta(minutes=15), # ⑦実行制限時間
    dag=dag
)

# gcs出力
export_csv = BigQueryToCloudStorageOperator(
    task_id='export_csv',
    source_project_dataset_table='{}.{}.analytics_table'.format(gcppj, gcpds),
    destination_cloud_storage_uris='gs://{}/analytics_{}.csv'.format(gcs, yesterday),
    compression=None,
    export_format='CSV',
    field_delimiter=',',
    print_header=True,
    bigquery_conn_id='bigquery_default',
    delegate_to=None,
    labels=None,
    execution_timeout=timedelta(minutes=10),
    dag=dag
    )

# ⑧依存関係定義
create_table >> export_csv

①Variablesから値を取得

Airflowから環境変数を設定し、その値を取り出すことができます。jsonファイルでのインポートも可能で、Variables.jsonをインポートすると以下のようにセットされます。

AirflowのウェブサーバはComposerのコンソールから遷移できます。

②ワークフロー失敗時のアラート通知

ワークフロー失敗時にメールやslack通知が可能です。今回はslack通知の設定をしています。

③前のタスクが正常に完了したら次のタスクを実行

この値をTrueにするとで設定できます。

④UTC時間でスケジュールを設定

実行時間を指定できます。注意点としてAirflowはUTC時間なので、実行したい日本時間から-9時間引いて設定します。

⑤実行するSQLのファイルを指定

Dagsから相対パスで指定することが可能です。

⑥渡すパラメーターの設定

ここではSQL内で利用するパラメータを渡しています。

⑦実行制限時間

タスクの制限時間を指定します。ここの設定がないと、永遠にタスクが実行中となる場合があります。

⑧依存関係定義

依存関係の定義をします。

Aを実行→B,Cを実行→Dを実行という場合であれば、A >> [b,c] >> dというように定義します。

手動デプロイと実行

DAGフォルダを格納するGoogle Cloud Storageへファイルを置くと、Airflowへ反映されます。

ComposerのコンソールからDAGフォルダへ遷移し、ファイル・フォルダのアップロードをします。

Airflowウェブサーバへアクセスすると、先ほどアップロードしたsample-workflow DAGが反映されています。

成功すると、BigQueryにテーブルが作成され、Google Cloud StorageにCSVファイルが出力されています。

Cloud Source RepositoriesとCloud BuildによるCI/CD

Cloud Source RepositoriesCloud Buildを利用することで、ビルド・テスト・デプロイを自動化することができます。

今回はmasterにpushされたら自動でGoogle Cloud Storageに反映するように設定してみます。

GithubとCloud Source Repositoriesを連携

まずはGithubとCloud Source Repositoriesを連携します。

コンソールからCloud Source Repositoriesへ遷移し、開始からリポジトリの作成へと進みます。APIの有効化画面に遷移した場合は、有効にしてください。

Cloud Source Repositoriesに何かしらのリポジトリがないと、外部リポジトリを接続した際に”リポジトリに接続できませんでした”というエラーが発生します。

そのため、まずはリポジトリの追加で”新しいリポジトリの作成”を選択して、空のリポジトリを作ります。

すべてのリポジトリへ移り、リポジトリの追加で今度は”外部リポジトリの接続”を選択します。

GCPプロジェクトとGitプロバイダを選択して、認証情報にチェックします。

GitHubとGCPの連携認証を求められるので、GitHubにログインして画面の表示通りに進めます。認証が済むとリポジトリ一覧が表示されるので、対象のリポジトリを選択して接続します。

Cloud Buildの設定

Cloud Buildの画面へ遷移し、まずはCloud BuildからGoogle Cloud Storage、Cloud Composerへのアクセス権限を付与します。設定のサービスアカウントをコピーし、IAMと管理でストレージ管理者・Composer管理者を付与します。

Cloud Build画面へ戻り、”ビルドトリガーを設定”で①〜④の部分を設定し、トリガーを作成します。

①プロジェクトで一意の名前をつけます。

②デプロイするイベントを選択します。今回は”ブランチにpushする”を選択します。

③ソースを選択します。Cloud Source Repositoriesで作成したリポジトリを選択し、ブランチは^master$にします。

④masterにpushされたときに動かす設定ファイルを選択します。”Cloud Build構成ファイル(yaml または json)”を選択し、Locationはリポジトリにします。

続いてcloudbuild.yamlファイルを作成します。dags配下をGoogle Cloud Storageへコピーする処理をyamlファイルに記述します。

steps:
- name: 'gcr.io/cloud-builders/gcloud'
  entrypoint: 'bash'
  args:
    - '-exc'
    - |
      gsutil cp -r dags/* gs://xxx/dags

gs://xxx/dags部分は、DAGSフォルダを格納するGoogle Cloud StorageのURIへ変更します。

追加したyamlファイルをmasterへpushするとCloud Buildが実行され、成功するとソースコードがGoogle Clouod Storageへ反映されます。

コスト

Cloud Composerの料金は、環境スペックと実行時間のほか、実行環境でCompute Engine、データの保存でGoogle Cloud Storageのコストがかかります。

試算ツールで計算すると、最小構成であるノード数3つの場合1ヶ月に約43,000円ほどかかります。

まとめ

Cloud Composerでワークフローを構築する方法をご紹介しました。

オンプレからクラウドまでの接続コネクタがあるので、取り込みから出力まで一連のワークフローの実装が可能です。

BigQuery内で完結できるワークフローにはDataformを利用し、BigQuery以外も利用する場合はCloud Composerという使い分けができるように思いました。

Dataformの使い方は以前の記事をご参照ください。この記事が参考になれば幸いです!