UUUM攻殻機動隊(エンジニアブログ)

UUUMのエンジニアによる技術ブログです

Sidekiqが動く仕組みについて

UUUMに入社して1年が経ったエンジニアの北畠です。 Railsも使い始めてもう半年経ちました。

そこで、Railsのジョブキューのgemで有名なSidekiq。

UUUMのプロダクトでもかなり使っているんですが、

どのように動いているかよくわかっていないので調べてみました。

基本的な使い方は

Getting Started · mperham/sidekiq Wiki · GitHub

から

まずはエンキュー

ActibeJob.perform_later

キューに入れたい処理はActiveJob::Baseを拡張したクラスのperformメソッドに実装し、

perform_laterを呼び出すと、エンキューの処理が走る。

Class SampleJob < ActiveJob::Base
  def perform
    p "Hello!"
  end
end

...

SampleJob.perform_later

queue_adapter.enqueue

perform_laterは ActiveJob.queue_adapter に設定されているオブジェクトの enqueue にjobインスタンスを投げます。 queue_adapterの設定はデフォルトでは、 :async(AsyncAdapter) で、configで以下のように指定すると、SidekiqAdapterが設定されます。

config.active_job.queue_adapter = :sidekiq

他にどんなアダプターがあるかは、activjob/queue_adapters.rb のコメントに結構細かく書かれています。

rails/queue_adapters.rb at master · rails/rails · GitHub

SidekiqAdapter.enqueue

def enqueue(job) #:nodoc:
  #Sidekiq::Client does not support symbols as keys
  job.provider_job_id = Sidekiq::Client.push \
    'class'   => JobWrapper,
    'wrapped' => job.class.to_s,
    'queue'   => job.queue_name,
    'args'    => [ job.serialize ]
end

こんな感じ。 ジョブのクラス名や引数を Sidekiq::Client に投げている。

class として指定されている。JobWrapper の実装はかなり薄くて、

class JobWrapper #:nodoc:
  include Sidekiq::Worker

  def perform(job_data)
    Base.execute job_data.merge('provider_job_id' => jid)
  end
end

これだけです。 Base.executeは受け取ったjob_dataをdeserializeして、performを実行します。

Sidekiq.client

Sidekiq.clientのpush処理では、受け取ったジョブ情報をRedis上に保存できる状態に成形し、ジョブIDの情報を追加して、 Redisの queues:#{queue_name} キーに追加します。

サンプルで追加してみた中身はこんな感じのjsonです。

{
    "class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
    "wrapped": "SampleJob",
    "queue": "default",
    "args": [
        {
            "job_class": "SampleJob",
            "job_id": "ae214f55-3d84-4037-8086-be111661ec6a",
            "queue_name": "default",
            "priority": null,
            "arguments": [],
            "locale": "ja"
        }
    ],
    "retry": true,
    "jid": "d1ecd3aa4bf5340b5c2a52f6",
    "created_at": 1506733202.404303,
    "enqueued_at": 1506733202.405045
}

ジョブの取得と実行

Redisにジョブを追加するだけではジョブは実行されません。 ジョブ情報を取得して、実行する必要があります。

取得

bundle exec sidekiq を実行することで、ジョブを取得、実行するプロセスを起動できます。

プロセスが起動すると、Sidekiq::Launcher.run が実行されます。

sidekiq/launcher.rb at master · mperham/sidekiq · GitHub

まず、Sidekiq::Managerがworkerを作成し、起動します。(デフォルトの設定では25個作成されます。)

workerはそれぞれのスレッドを起動し、ジョブ情報をRedisから取得しに行きます。

wokerの中身は Sidekiq::Processor で、Processor.get_one から下記の Sidekiq::BasicFetch.retrieve_work が呼び出され、Redisからジョブ情報を取得しています。

sidekiq/fetch.rb at master · mperham/sidekiq · GitHub

実行

Redisから取得したジョブ情報を元に、ジョブのクラスと引数情報を deserialize して、worker上のスレッドで実行されます

sidekiq/processor.rb at master · mperham/sidekiq · GitHub

performが呼び出されてます。

Railsのloadは元のプロセス起動時に行われています。

sidekiq/cli.rb at master · mperham/sidekiq · GitHub

scheduled

下記のように waitwait_untill を設定した場合はどうなるか?

SampleJob.set(wait: 5.minutes).perform_later

エンキュー

指定された値から、実行する時刻を計算し、 queue_adapterの enqueue_at にジョブ情報と一緒に渡されます。

#SidekiqAdapter.enqueue_at
def enqueue_at(job, timestamp) #:nodoc:
  job.provider_job_id = Sidekiq::Client.push \
    'class'   => JobWrapper,
    'wrapped' => job.class.to_s,
    'queue'   => job.queue_name,
    'args'    => [ job.serialize ],
    'at'      => timestamp
end

Sidekiq::Client側の処理

Sidekiq::Client側の処理では、 at が指定されていると sorted set に timestampでソートする形でジョブ情報が登録されます。 (上記の通常(at指定なし)のジョブ情報リストとは違うキーに保持されます)

取得、実行

Sidekiqのプロセスは上記で説明した プロセスを実行する woker を管理する manager の他に、 at 指定で登録されているジョブを ジョブキューに登録する poller が動いています。

pollerは5秒ごとに、 sorted set から 現在時刻より前 の timestamp のジョブ情報がないかを見に行き、 対象のジョブがあった場合は Sidekiq::Client::push を呼び出し、 sorted set から ジョブキューへジョブ情報を移します。

この処理により、 at で指定したジョブも worker達が処理する対象になってくれます。

retry

プロセスでエラーが起きた時、

retry が有効な場合で、retry数がまだMAX retry数内(デフォルトは25)の場合は、ジョブ情報が retry queue に追加されます。

retry queue に保持される情報は、通常のジョブ情報に、エラー情報とリトライ回数などの情報が加えらえれたものになります。

上記で話した scheduledのキューを処理する poller は、retryのキュー情報も見るようになっているので、retryに入れられた情報は、

自然とジョブキューへ追加されます。

リトライ実行のディレイはリトライ数が多くなるごとに指数関数的に多くなるようになっています。

ディレイ時間の計算処理 sidekiq/job_retry.rb at master · mperham/sidekiq · GitHub

リトライ設定がfalseの場合と、リトライ数が上限を超えたジョブ情報は dead というキーの sorted set に 失敗時刻が sort 対象の項目として、登録されます。

dead データセットは、オプションの dead_max_jobs, dead_timeout_in_seconds の設定を元に、

失敗したジョブ情報を保持しています。

デフォルトの設定は下記となっています

deed_max_jobs: 1万
dead_timeout_in_seconds: 6ヶ月

となっています。

sidekiq-cron

Sidekiqをcron形式の設定でスケジューリングするgemが sidekiq-cron です。

github.com

設定ファイルの読み込み

YAMLの設定ファイルを読み込む場合は、 config/initializers などに

Sidekiq::Cron::Job.load_from_hash YAML.load_file(#{filepath})

のように指定することで、Redis上に、cronの設定情報が保存されます。

poller

sidekiq-cronでプロセスを持つことはなく、sidekiqのlauncher を上書きすることで実現しています。

# Sidekiq::Lancher

# remember old run
alias_method :old_run, :run

# execute normal run of launcher and run cron poller
def run
  old_run
  cron_poller.start
end

sidekiqの manager, poller とは別に、 cron_poller の処理を start しているのが解ります。

cron_poller の 処理は、上記でRedisに取り込んだ cron の設定を定期的に見に行って、ジョブキューへ追加する処理をやっています。

Redisの使い方のまとめ

今まで説明した中でRedisでいくつかのキーを使っていましたが、どんな使い方をしているか整理してみます。

queue周り

queues(set型)

queue の種類のリスト。

デフォルトでは、 default のみです。

queue:{queue name}(list型)

job queue の本体。

lpush, brpop することで、ジョブ情報の追加、取得を行っています。

タイムスタンプでのソート済みセット

遅延実行(key: schedule)、リトライ(key: retry)、cron(key: cron)はタイムスタンプを score としたソート済みセットとして実装されています。

zrangebyscore で- inf から now を定期的に取りに行くことで、実行時間になったジョブを queue に追加しています。

その他

ジョブの成功数、失敗数の情報を文字列型で持っていたり、

sidekiq-cronでは、yamlなどで書いた設定を Redis側にloadしていたり、いろいろな使い方をしています。

おわりに

Sidekiqなどのジョブキューの実装は自分の中ではブラックボックスで、結構複雑な感じがしていましたが、

実際に見てみると、結構シンプルで解りやすかったかなと思います。

また、Redisの色々なデータ型を駆使しているプログラムなので、Redisを使ったアプリケーションの見本のストックとして認識しておけば、自分がRedisを色々使うときにも参考になるのではないでしょうか!