Apache Airflow 使い出したら便利だったのでメモ。
Airflow と BigQuery を繋げる方法を探してたら↓のブログ記事を見つけた。
How to aggregate data for BigQuery using Apache Airflow | Google Cloud Blog
しかし一部 LegacySQL 使ってたりして書き方が気に入らん部分があったので直したのがこちら。
from datetime import timedelta, datetime import json from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator default_args = { 'retries': 5, 'retry_delay': timedelta(minutes=10), 'depends_on_past': True, 'gcp_conn_id': 'google_cloud_default', 'use_legacy_sql': False } with DAG( dag_id='bq_hn_trends', default_args=default_args, start_date=datetime(2021, 6, 1), schedule_interval='0 0 * * *' ) as dag: bq_check_githubarchive_day = BigQueryCheckOperator( task_id='bq_check_githubarchive_day', sql=""" SELECT FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date, FROM `githubarchive.day.{{ yesterday_ds_nodash }}` WHERE type IN ('WatchEvent', 'ForkEvent') """ ) bq_check_hackernes_full = BigQueryCheckOperator( task_id='bq_check_hackernes_full', sql=""" SELECT FORMAT_TIMESTAMP('%Y%m%d', timestamp) AS date FROM `bigquery-public-data.hacker_news.full` WHERE 1=1 AND type = 'story' AND FORMAT_TIMESTAMP('%Y%m%d', timestamp) = '{{ yesterday_ds_nodash }}' LIMIT 1 """ ) bq_write_to_github_daily_metrics = BigQueryOperator( task_id='bq_write_to_github_daily_metrics', write_disposition='WRITE_TRUNCATE', allow_large_results=True, sql=""" WITH base AS ( SELECT FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date, actor.id AS actor_id, repo.name AS repo, type FROM `githubarchive.day.{{ yesterday_ds_nodash }}` WHERE type IN ('WatchEvent', 'ForkEvent') ) SELECT date, repo, SUM(CASE WHEN type = 'WatchEvent' THEN 1 ELSE NULL END) AS stars, SUM(CASE WHEN type = 'ForkEvent' THEN 1 ELSE NULL END) AS forks FROM base GROUP BY date, repo """, destination_dataset_table='my-project-id.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}' ) bq_write_to_github_agg = BigQueryOperator( task_id='bq_write_to_github_agg', write_disposition='WRITE_TRUNCATE', allow_large_results=True, sql=""" select '{{ yesterday_ds_nodash }}' as date, repo, sum(stars) as stars_last_28_days, sum( case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then stars else null end) as stars_last_7_days, sum( case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then stars else null end) as stars_last_1_day, sum(forks) as forks_last_28_days, sum( case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then forks else null end) as forks_last_7_days, sum( case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then forks else null end) as forks_last_1_day from `my-project-id.github_trends.github_daily_metrics` where _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -27) }}') and timestamp('{{ yesterday_ds }}') group by date, repo """, destination_dataset_table='my-project-id.github_trends.github_agg${{ yesterday_ds_nodash }}' ) bq_write_to_hackernews_agg = BigQueryOperator( task_id='bq_write_to_hackernews_agg', write_disposition='WRITE_TRUNCATE', allow_large_results=True, sql=""" select format_timestamp('%Y%m%d', timestamp) as date, `by` as submitter, id as story_id, regexp_extract(url, "https?://github.com/[^/]*/[^/#?]*") as url, sum(score) as score from `bigquery-public-data.hacker_news.full` where type = 'story' and timestamp > '{{ yesterday_ds }}' and timestamp < '{{ ds }}' and url like '%https://github.com%' and url not like '%gihub.com/blog/%' group by date, submitter, story_id, url """, destination_dataset_table='my-project-id.github_trends.hackernews_agg${{ yesterday_ds_nodash }}' ) bq_write_to_hackernews_github_agg = BigQueryOperator( task_id='bq_write_to_hackernews_github_agg', write_disposition='WRITE_TRUNCATE', allow_large_results=True, sql=""" with base1 as ( select * from `my-project-id.github_trends.hackernews_agg` where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') ), base2 as ( select repo, concat('https://gitnub.com/', repo)as url, stars_last_28_days, stars_last_7_days, stars_last_1_day, forks_last_28_days, forks_last_7_days, forks_last_1_day, from `my-project-id.github_trends.github_agg` where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') ) select base1.date as date, base1.url as github_url, base2.repo as github_repo, base1.score as hn_score, base1.story_id as hn_story_id, base2.stars_last_28_days as stars_last_28_days, base2.stars_last_7_days as stars_last_7_days, base2.stars_last_1_day as stars_last_1_day, base2.forks_last_28_days as forks_last_28_days, base2.forks_last_7_days as forks_last_7_days, base2.forks_last_1_day as forks_last_1_day from base1 left join base2 on base1.url = base2.url """, destination_dataset_table='my-project-id.github_trends.hackernews_github_agg${{ yesterday_ds_nodash }}' ) bq_check_hackernews_github_agg = BigQueryCheckOperator( task_id='bq_check_hackernews_github_agg', sql=""" SELECT _PARTITIONTIME as pt FROM `my-project-id.github_trends.hackernews_github_agg` where format_timestamp('%Y%m%d', _PARTITIONTIME) = '{{ yesterday_ds_nodash }}' order by 1 limit 1 """ ) [bq_check_githubarchive_day >> bq_write_to_github_daily_metrics >> bq_write_to_github_agg, bq_check_hackernes_full >> bq_write_to_hackernews_agg] >> bq_write_to_hackernews_github_agg >> bq_check_hackernews_github_agg
backfill コマンドが Airflow 1.x の時と今使ってる Airflow 2.x とで構文がちょっと変わってた。
2.x だとこうなる。
$ airflow dags backfill -t bq_write_to_github_daily_metrics -s 2021-06-02 -e 2021-07-01 bq_hn_trends
Cloud Composer 使うと高いから、e2-small インスタンスで Airflow を自前で動かしてる。
e2-micro だとメモリ一杯になってフリーズしたw
Airflow のおかげで今まで一所懸命 bash + cron で運用してたのがアホらしくなってしまってもう戻れない。