はじめに
こんにちは、UUUMに入社して3年ほど経つishiharaです。
私が今担当しているプロジェクトに大量のCSVデータを集計するシステムがあり、そこで最近いくつもの集計作業を一連のワークフローとして自動化する仕組みを構築したので簡単に内容を紹介していこうと思います。
利用サービスについて
AWS Step Functionsとは
Step Functionsは、AWSの様々なサービスと連携して、1つのワークフローとなるアプリケーションを構築できるサービスです。
Step Functionsはステートマシンとタスクから構成されています。ここでのステートマシンはワークフローのことで、タスクはワークフロー内の1つ1つの状態(ステップ)になります。
例えば、 1,2,44,32,5433,...
のような適当な数値の羅列からなるCSVデータがあったとして、そこから5以上の数値を取り出したいとします。
作業として、
CSVデータの取得→5以上の数値を抜き出す→抜き出したデータの保存(や通知)
といった一連の流れがワークフローであり、→で区切られた1つ1つの作業がタスクとなります。
また、1つのワークフローは1つのJSONファイルから構成されており、Amazon States Language に則って定義していくことでワークフローを実行することができます。
Amazon Athenaとは
↓を参考にしてください。
構成の概要
下準備
ワークフロー作成前に、バケットを用意してデータを格納し、Athenaでテーブル定義してSQLを実行できるようにしましょう。
データは下記のようなデータを用意したとします。
sample-athena-step-functions-bucket.csv
video_id | channel_id | views | video_type | date |
---|---|---|---|---|
WJzSBLCaKc8 | UCZf_ehlCEBPop-sldpBUQ | 103200 | none | 2020-11-01 |
WJzSBLCaKc8 | UCZf_ehlCEBPop-sldpBUQ | 12132 | none | 2020-11-02 |
FUb-lE0tQC4 | UCZf_ehlCEBPop-sldpBUQ | 100031 | short | 2020-11-01 |
FUb-lE0tQC4 | UCZf_ehlCEBPop-sldpBUQ | 10001 | short | 2020-11-02 |
uJwa7y6Q6Sw | UCgMPP6RRjktV7krOfyUewqw | 102203 | none | 2020-11-01 |
uJwa7y6Q6Sw | UCgMPP6RRjktV7krOfyUewqw | 12203 | none | 2020-11-02 |
テーブル定義のクエリ
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`sample_athena_step_functions_bucket` ( `video_id` string, `channel_id` string, `views` int, `video_type` string, `date` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ('separatorChar' = ',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://sample-athena-step-functions-bucket/' TBLPROPERTIES ( 'classification' = 'csv', 'skip.header.line.count' = '1' );
Athenaを実行する
まずはStep Functionsのサービスへ行き、ステートマシンを作成しましょう。 作成方法はデフォルトのまま次へ進んでいきましょう。
そうすると下記のような画面になると思います。
ここでは、Athenaのクエリを実行するためのAPIのAthena: StartQueryExecution
をタスクとしてワークフローに追加します。
そしたら状態名を任意で変更し、APIに渡すパラメータをJSON形式で入力していきます。
APIパラメータ
{ "QueryExecutionContext": { "Database": "default" }, "QueryString": "select video_id, sum(views) as views from sample_athena_step_functions_bucket group by video_id", "ResultConfiguration": { "OutputLocation": "s3://sample-athena-query-result/" }, "WorkGroup": "primary" }
注)OutputLocationで指定しているバケットは、Athenaのクエリ実行時にデータを格納する場所で、なければ作成する必要があります。
入力したらあとはそのまま進んでステートマシン名とロールを適宜入力して作成します。
作成したら「実行の開始」を押下してみてください。
ワークフローが周り、クエリの実行結果がCSV形式で指定のバケットに保存されていることが確認できます。
ただ、ファイル名がこのようでは少しわかりづらいですよね。
次にファイル名を変更し、ついでに実行結果を格納するのとは別に必要なレポートを格納するバケットを別に用意しようと思います。
ファイル名を変更する
実行結果を格納するバケットとは別に新たに「sample-step-functions-report」というバケットを用意します。
その後、ステートマシンの編集画面に戻り、先程と同様に今度はAPIの S3: CopyObject
というタスクを追加します。
APIパラメータ
{ "Bucket": "sample-step-functions-report", "CopySource.$": "States.Format('sample-athena-query-result/{}.csv', $.output.QueryExecutionId)", "Key": "sample_report_group_by_video_id.csv" }
こちらにある CopySource.$
の .$
はパラメータを参照するためのものです。
AWS Step Functionsの組み込み関数であるStates.Format関数を利用してパラメータを展開しています。
また、ここで利用しているパラメータは1つ前のタスクでの実行結果であり、デフォルトで引き渡されるものではないので1つ前のタスクから引き渡されるようにしないといけません。
最後に、このままではAthenaによってオブジェクトがS3に生成される前にCopyObjectを実行しようとしてエラーが出てしまうのでエラー処理も施します。
これで準備完了です。 先程のように「実行の開始」を押下するとファイル名が変更され、指定のバケットに保存されたことが確認できると思います。
並列処理にする
最後に、直列の処理だけではなく、並列して実行できる作業がある場合、並列処理にすることで集計スピードを早めることができます。 動画ごとにまとめる以外にチャンネル別に集計したい場合、それぞれ並列して処理をしてみたいと思います。
並列処理を追加したら、あとはこれまでと同様にタスクを追加すれば完了です。APIパラメータや状態名を必要に応じて変更していきましょう。
チャンネルごとにまとめる
{ "QueryExecutionContext": { "Database": "default" }, "QueryString": "select channel_id, sum(views) as views from sample_athena_step_functions_bucket group by channel_id", "ResultConfiguration": { "OutputLocation": "s3://sample-athena-query-result/" }, "WorkGroup": "primary" }
チャンネルごとにまとめたファイル名の変更
{ "Bucket": "sample-step-functions-report", "CopySource.$": "States.Format('sample-athena-query-result/{}.csv', $.output.QueryExecutionId)", "Key": "sample_report_group_by_channel_id.csv" }
まとめ
いかがでしたでしょうか。
ここでの内容は基礎的なものであり、実際に業務で利用する場合にはさらにいくつものAWSのサービスと連携していく必要はあるかと思いますが、Step Functionsを活用すれば自動化できる作業の範囲が広がっていくと思います。
また、並列処理などを活用することで自動化による作業コストの削減だけでなく、作業スピードの改善につなげることもできます。
この内容が少しでも皆さんのお役に立てば幸いです。