UUUMに入社して1年が経ったエンジニアの北畠です。 Railsも使い始めてもう半年経ちました。
そこで、Railsのジョブキューのgemで有名なSidekiq。
UUUMのプロダクトでもかなり使っているんですが、
どのように動いているかよくわかっていないので調べてみました。
基本的な使い方は
Getting Started · mperham/sidekiq Wiki · GitHub
から
まずはエンキュー
ActibeJob.perform_later
キューに入れたい処理はActiveJob::Baseを拡張したクラスのperformメソッドに実装し、
perform_laterを呼び出すと、エンキューの処理が走る。
Class SampleJob < ActiveJob::Base def perform p "Hello!" end end ... SampleJob.perform_later
queue_adapter.enqueue
perform_laterは ActiveJob.queue_adapter に設定されているオブジェクトの enqueue にjobインスタンスを投げます。 queue_adapterの設定はデフォルトでは、 :async(AsyncAdapter) で、configで以下のように指定すると、SidekiqAdapterが設定されます。
config.active_job.queue_adapter = :sidekiq
他にどんなアダプターがあるかは、activjob/queue_adapters.rb のコメントに結構細かく書かれています。
rails/queue_adapters.rb at master · rails/rails · GitHub
SidekiqAdapter.enqueue
def enqueue(job) #:nodoc: #Sidekiq::Client does not support symbols as keys job.provider_job_id = Sidekiq::Client.push \ 'class' => JobWrapper, 'wrapped' => job.class.to_s, 'queue' => job.queue_name, 'args' => [ job.serialize ] end
こんな感じ。 ジョブのクラス名や引数を Sidekiq::Client に投げている。
class として指定されている。JobWrapper の実装はかなり薄くて、
class JobWrapper #:nodoc: include Sidekiq::Worker def perform(job_data) Base.execute job_data.merge('provider_job_id' => jid) end end
これだけです。 Base.executeは受け取ったjob_dataをdeserializeして、performを実行します。
Sidekiq.client
Sidekiq.clientのpush処理では、受け取ったジョブ情報をRedis上に保存できる状態に成形し、ジョブIDの情報を追加して、
Redisの queues:#{queue_name}
キーに追加します。
サンプルで追加してみた中身はこんな感じのjsonです。
{ "class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", "wrapped": "SampleJob", "queue": "default", "args": [ { "job_class": "SampleJob", "job_id": "ae214f55-3d84-4037-8086-be111661ec6a", "queue_name": "default", "priority": null, "arguments": [], "locale": "ja" } ], "retry": true, "jid": "d1ecd3aa4bf5340b5c2a52f6", "created_at": 1506733202.404303, "enqueued_at": 1506733202.405045 }
ジョブの取得と実行
Redisにジョブを追加するだけではジョブは実行されません。 ジョブ情報を取得して、実行する必要があります。
取得
bundle exec sidekiq
を実行することで、ジョブを取得、実行するプロセスを起動できます。
プロセスが起動すると、Sidekiq::Launcher.run が実行されます。
sidekiq/launcher.rb at master · mperham/sidekiq · GitHub
まず、Sidekiq::Managerがworkerを作成し、起動します。(デフォルトの設定では25個作成されます。)
workerはそれぞれのスレッドを起動し、ジョブ情報をRedisから取得しに行きます。
wokerの中身は Sidekiq::Processor で、Processor.get_one から下記の Sidekiq::BasicFetch.retrieve_work が呼び出され、Redisからジョブ情報を取得しています。
sidekiq/fetch.rb at master · mperham/sidekiq · GitHub
実行
Redisから取得したジョブ情報を元に、ジョブのクラスと引数情報を deserialize して、worker上のスレッドで実行されます
sidekiq/processor.rb at master · mperham/sidekiq · GitHub
performが呼び出されてます。
Railsのloadは元のプロセス起動時に行われています。
sidekiq/cli.rb at master · mperham/sidekiq · GitHub
scheduled
下記のように wait
や wait_untill
を設定した場合はどうなるか?
SampleJob.set(wait: 5.minutes).perform_later
エンキュー
指定された値から、実行する時刻を計算し、 queue_adapterの enqueue_at にジョブ情報と一緒に渡されます。
#SidekiqAdapter.enqueue_at def enqueue_at(job, timestamp) #:nodoc: job.provider_job_id = Sidekiq::Client.push \ 'class' => JobWrapper, 'wrapped' => job.class.to_s, 'queue' => job.queue_name, 'args' => [ job.serialize ], 'at' => timestamp end
Sidekiq::Client側の処理
Sidekiq::Client側の処理では、 at
が指定されていると sorted set に timestampでソートする形でジョブ情報が登録されます。
(上記の通常(at
指定なし)のジョブ情報リストとは違うキーに保持されます)
取得、実行
Sidekiqのプロセスは上記で説明した プロセスを実行する woker を管理する manager の他に、
at
指定で登録されているジョブを ジョブキューに登録する poller が動いています。
pollerは5秒ごとに、 sorted set から 現在時刻より前
の timestamp のジョブ情報がないかを見に行き、
対象のジョブがあった場合は Sidekiq::Client::push を呼び出し、 sorted set から ジョブキューへジョブ情報を移します。
この処理により、 at
で指定したジョブも worker達が処理する対象になってくれます。
retry
プロセスでエラーが起きた時、
retry が有効な場合で、retry数がまだMAX retry数内(デフォルトは25)の場合は、ジョブ情報が retry queue に追加されます。
retry queue に保持される情報は、通常のジョブ情報に、エラー情報とリトライ回数などの情報が加えらえれたものになります。
上記で話した scheduledのキューを処理する poller は、retryのキュー情報も見るようになっているので、retryに入れられた情報は、
自然とジョブキューへ追加されます。
リトライ実行のディレイはリトライ数が多くなるごとに指数関数的に多くなるようになっています。
ディレイ時間の計算処理 sidekiq/job_retry.rb at master · mperham/sidekiq · GitHub
リトライ設定がfalseの場合と、リトライ数が上限を超えたジョブ情報は dead
というキーの sorted set に 失敗時刻が sort 対象の項目として、登録されます。
dead
データセットは、オプションの dead_max_jobs
, dead_timeout_in_seconds
の設定を元に、
失敗したジョブ情報を保持しています。
デフォルトの設定は下記となっています
deed_max_jobs: 1万 dead_timeout_in_seconds: 6ヶ月
となっています。
sidekiq-cron
Sidekiqをcron形式の設定でスケジューリングするgemが sidekiq-cron です。
設定ファイルの読み込み
YAMLの設定ファイルを読み込む場合は、 config/initializers などに
Sidekiq::Cron::Job.load_from_hash YAML.load_file(#{filepath})
のように指定することで、Redis上に、cronの設定情報が保存されます。
poller
sidekiq-cronでプロセスを持つことはなく、sidekiqのlauncher を上書きすることで実現しています。
# Sidekiq::Lancher # remember old run alias_method :old_run, :run # execute normal run of launcher and run cron poller def run old_run cron_poller.start end
sidekiqの manager, poller とは別に、 cron_poller の処理を start しているのが解ります。
cron_poller の 処理は、上記でRedisに取り込んだ cron の設定を定期的に見に行って、ジョブキューへ追加する処理をやっています。
Redisの使い方のまとめ
今まで説明した中でRedisでいくつかのキーを使っていましたが、どんな使い方をしているか整理してみます。
queue周り
queues(set型)
queue の種類のリスト。
デフォルトでは、 default のみです。
queue:{queue name}(list型)
job queue の本体。
lpush, brpop することで、ジョブ情報の追加、取得を行っています。
タイムスタンプでのソート済みセット
遅延実行(key: schedule)、リトライ(key: retry)、cron(key: cron)はタイムスタンプを score としたソート済みセットとして実装されています。
zrangebyscore で- inf から now を定期的に取りに行くことで、実行時間になったジョブを queue に追加しています。
その他
ジョブの成功数、失敗数の情報を文字列型で持っていたり、
sidekiq-cronでは、yamlなどで書いた設定を Redis側にloadしていたり、いろいろな使い方をしています。
おわりに
Sidekiqなどのジョブキューの実装は自分の中ではブラックボックスで、結構複雑な感じがしていましたが、
実際に見てみると、結構シンプルで解りやすかったかなと思います。
また、Redisの色々なデータ型を駆使しているプログラムなので、Redisを使ったアプリケーションの見本のストックとして認識しておけば、自分がRedisを色々使うときにも参考になるのではないでしょうか!