UUUMエンジニアブログ

UUUMのエンジニアによる技術ブログです

AWS Step FunctionsとAmazon AthenaでCSVデータを自動集計したい!

はじめに

こんにちは、UUUMに入社して3年ほど経つishiharaです。

私が今担当しているプロジェクトに大量のCSVデータを集計するシステムがあり、そこで最近いくつもの集計作業を一連のワークフローとして自動化する仕組みを構築したので簡単に内容を紹介していこうと思います。

利用サービスについて

AWS Step Functionsとは

Step Functionsは、AWSの様々なサービスと連携して、1つのワークフローとなるアプリケーションを構築できるサービスです。

Step Functionsはステートマシンとタスクから構成されています。ここでのステートマシンはワークフローのことで、タスクはワークフロー内の1つ1つの状態(ステップ)になります。
例えば、 1,2,44,32,5433,... のような適当な数値の羅列からなるCSVデータがあったとして、そこから5以上の数値を取り出したいとします。
作業として、
CSVデータの取得→5以上の数値を抜き出す→抜き出したデータの保存(や通知)
といった一連の流れがワークフローであり、→で区切られた1つ1つの作業がタスクとなります。

また、1つのワークフローは1つのJSONファイルから構成されており、Amazon States Language に則って定義していくことでワークフローを実行することができます。

Amazon Athenaとは

↓を参考にしてください。

system.blog.uuum.jp

構成の概要

構成イメージ

下準備

ワークフロー作成前に、バケットを用意してデータを格納し、Athenaでテーブル定義してSQLを実行できるようにしましょう。

データは下記のようなデータを用意したとします。

sample-athena-step-functions-bucket.csv

video_id channel_id views video_type date
WJzSBLCaKc8 UCZf_ehlCEBPop-sldpBUQ 103200 none 2020-11-01
WJzSBLCaKc8 UCZf_ehlCEBPop-sldpBUQ 12132 none 2020-11-02
FUb-lE0tQC4 UCZf_ehlCEBPop-sldpBUQ 100031 short 2020-11-01
FUb-lE0tQC4 UCZf_ehlCEBPop-sldpBUQ 10001 short 2020-11-02
uJwa7y6Q6Sw UCgMPP6RRjktV7krOfyUewqw 102203 none 2020-11-01
uJwa7y6Q6Sw UCgMPP6RRjktV7krOfyUewqw 12203 none 2020-11-02

S3バケット

テーブル定義のクエリ

CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`sample_athena_step_functions_bucket` (
  `video_id` string,
  `channel_id` string,
  `views` int,
  `video_type` string,
  `date` string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ('separatorChar' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://sample-athena-step-functions-bucket/'
TBLPROPERTIES (
  'classification' = 'csv',
  'skip.header.line.count' = '1'
);

テーブル作成

Athenaを実行する

まずはStep Functionsのサービスへ行き、ステートマシンを作成しましょう。 作成方法はデフォルトのまま次へ進んでいきましょう。

ステートマシンの作成

そうすると下記のような画面になると思います。
ここでは、Athenaのクエリを実行するためのAPIのAthena: StartQueryExecution をタスクとしてワークフローに追加します。 そしたら状態名を任意で変更し、APIに渡すパラメータをJSON形式で入力していきます。

タスクの追加

APIパラメータ

{
  "QueryExecutionContext": {
    "Database": "default"
  },
  "QueryString": "select video_id, sum(views) as views from sample_athena_step_functions_bucket group by video_id",
  "ResultConfiguration": {
    "OutputLocation": "s3://sample-athena-query-result/"
  },
  "WorkGroup": "primary"
}

注)OutputLocationで指定しているバケットは、Athenaのクエリ実行時にデータを格納する場所で、なければ作成する必要があります。

入力したらあとはそのまま進んでステートマシン名とロールを適宜入力して作成します。
作成したら「実行の開始」を押下してみてください。

作成されたオブジェクト

ワークフローが周り、クエリの実行結果がCSV形式で指定のバケットに保存されていることが確認できます。

ただ、ファイル名がこのようでは少しわかりづらいですよね。
次にファイル名を変更し、ついでに実行結果を格納するのとは別に必要なレポートを格納するバケットを別に用意しようと思います。

ファイル名を変更する

実行結果を格納するバケットとは別に新たに「sample-step-functions-report」というバケットを用意します。
その後、ステートマシンの編集画面に戻り、先程と同様に今度はAPIの S3: CopyObject というタスクを追加します。

タスクの追加

APIパラメータ

{
  "Bucket": "sample-step-functions-report",
  "CopySource.$": "States.Format('sample-athena-query-result/{}.csv', $.output.QueryExecutionId)",
  "Key": "sample_report_group_by_video_id.csv"
}

こちらにある CopySource.$.$ はパラメータを参照するためのものです。
AWS Step Functionsの組み込み関数であるStates.Format関数を利用してパラメータを展開しています。
また、ここで利用しているパラメータは1つ前のタスクでの実行結果であり、デフォルトで引き渡されるものではないので1つ前のタスクから引き渡されるようにしないといけません。

パラメータの引き渡し

最後に、このままではAthenaによってオブジェクトがS3に生成される前にCopyObjectを実行しようとしてエラーが出てしまうのでエラー処理も施します。

エラー処理

これで準備完了です。 先程のように「実行の開始」を押下するとファイル名が変更され、指定のバケットに保存されたことが確認できると思います。

レポートの作成

並列処理にする

最後に、直列の処理だけではなく、並列して実行できる作業がある場合、並列処理にすることで集計スピードを早めることができます。 動画ごとにまとめる以外にチャンネル別に集計したい場合、それぞれ並列して処理をしてみたいと思います。

並列処理

並列処理を追加したら、あとはこれまでと同様にタスクを追加すれば完了です。APIパラメータや状態名を必要に応じて変更していきましょう。

チャンネルごとにまとめる

{
  "QueryExecutionContext": {
    "Database": "default"
  },
  "QueryString": "select channel_id, sum(views) as views from sample_athena_step_functions_bucket group by channel_id",
  "ResultConfiguration": {
    "OutputLocation": "s3://sample-athena-query-result/"
  },
  "WorkGroup": "primary"
}

チャンネルごとにまとめたファイル名の変更

{
  "Bucket": "sample-step-functions-report",
  "CopySource.$": "States.Format('sample-athena-query-result/{}.csv', $.output.QueryExecutionId)",
  "Key": "sample_report_group_by_channel_id.csv"
}

レポートの作成

まとめ

いかがでしたでしょうか。
ここでの内容は基礎的なものであり、実際に業務で利用する場合にはさらにいくつものAWSのサービスと連携していく必要はあるかと思いますが、Step Functionsを活用すれば自動化できる作業の範囲が広がっていくと思います。
また、並列処理などを活用することで自動化による作業コストの削減だけでなく、作業スピードの改善につなげることもできます。
この内容が少しでも皆さんのお役に立てば幸いです。