八発白中

はてなブログに引越しました。

Common Lisp用のメッセージキュー「Psychiq」を作りました

2年前にLesqueという、Common Lisp向けにRubyのResqueクローンを作りました。

メッセージキューやジョブキューという種類のプロダクトで、非同期処理を別のプロセスに移譲してバックグラウンドで行うものです。Redisをバックエンドとしています。

ある程度の規模のWebアプリケーションなんかを運用しているところなら必要になってくるだろうものですが、Common Lispでそういうプロダクトを作っているところがあるのかどうかも怪しいです。弊社サムライトはどうなんだと言われると苦しい。

ブログでLesqueの紹介記事は書いたんですが、その後使うことも無いしQuicklispにも申請せずメンテナンスもしていませんでした。特にせっつくようなコメントももらってないので需要もないのかもしれません。

blog.8arrow.org

物好きな僕のフォロワーがRedditにも投稿しています。

www.reddit.com

RをLに置き換えるなんて日本人のユーモアうける、みたいなことが書いてあります。

Sidekiqの台頭

それから2年が経って、状況も変わりました。以前はResque一択のような空気だったメッセージキューも、今はSidekiqという競合があります。

またメッセージキューが必要そうだなぁ、という機運が高まってきたので、この機会にもう一度Sidekiqのアーキテクチャをベースとして懲りずに再実装してみるかと思いました。

そして作ったのが「Psychiq」です。

Resque vs Sidekiq

ResqueもSidekiqもRedisを使ってジョブを処理するのは同じですし、SidekiqはResque互換であることも強調しています。

SidekiqのWebサイトを見ると他の競合製品より最大20倍速いなどと謳われています。高速で並列に大量のジョブを処理することを売りにしているようです。

スレッドベースのアーキテクチャ

違いはアーキテクチャにあります。

Resqueはワーカーをforkで子プロセスで立ち上げ、それぞれのワーカーがジョブをRedisからデキューして処理します。

一方のSidekiqはスレッドで処理します。スレッドのほうが軽量なので、多くのワーカーを起動できます。デフォルトでは25個のスレッドを立ち上げます。

また、スレッドのほうがメモリ空間を共有できるため、ワーカーをまたいだ処理が高速でシンプルに書けます。具体的には処理したジョブの数は失敗したジョブの数などの統計情報を記録する処理などがそれに当たります。

Redisに完全依存

SidekiqはバックエンドがRedis一択で、Resqueのように他のものを使うことができません。

これは欠点のように見えますが、実際はコードベースも小さくシンプルでメンタナブルに保つのに一躍買っているようです。バックエンドが増えるとそれぞれのバックエンドで挙動が変わらないことのテストとか書かないといけないし、大変だしね。

Psychiq

PsychiqはSidekiqのほぼ完全なCommon Lisp移植版です。Redisをバックエンドにしたメッセージキューで、Resque/Sidekiq互換になっています。

Worker (ジョブを処理するコード) を書く

まずは非同期処理をする部分を書きます。これはメインアプリケーションと、Psychiqプロセスで共有するコードなので独立したASDFシステムにしておくのが望ましいです。

引数を受け取って何か処理を行います。返り値は特に使われません。

(defclass my-worker (psy:worker) ())
(defmethod psy:perform ((worker my-worker) &rest args)
  ;; Do something
  )

メインアプリケーションでenqueueする

メインアプリケーションでジョブをenqueueします。先ほどのワーカークラスを指定し、引数リストを渡します。

;; ジョブをenqueueする
(psy:enqueue 'my-worker '("arg1" "arg2"))

;; 300秒後にジョブをenqueueする
(psy:enqueue-in-sec 300 'my-worker '("arg1" "arg2"))

;; 複数のジョブを同時に追加する
;; ループで回すよりも効率が良い
(psy:enqueue-bulk 'my-worker '("arg1" "arg2") '("another" "one") ...)

このデータはRedisに記録され、Psychiqにより処理されるのを待ちます。

Psychiqプロセスを立ち上げる

非同期でジョブを処理するPsychiqプロセスを立ち上げると処理が始まります。このプロセスは通常は立ち上げっぱなしにします。

Roswellスクリプトが提供されているのでコマンドラインから起動できます。そういえばLesqueを作ったときにはRoswellスクリプトなんてなかったんですよね。

現在はQuicklispに登録されていないのでまずはGitHubからインストールが必要です。*1

$ cd ~/common-lisp
$ git clone https://github.com/fukamachi/psychiq
$ ros -l psychiq/psychiq.asd install psychiq
# ~/.roswell/bin/psychiq がインストールされているはずです
$ psychiq --host localhost --port 6379 --system myapp-workers

これでメインアプリケーションでenqueueが行われるたびに、Redisを介してジョブがこのPsychiqプロセスに渡り、非同期で処理が行われます。

失敗したジョブはどうなるか

ジョブの処理中にerror conditionが発生するとジョブは失敗したものとして扱われます。

失敗したジョブはfailedキューに移動し、間を置いてから再度enqueueされて自動リトライされます。一定回数の失敗を繰り返すとdeadキューに移動します。

逆に言うなら、ジョブを処理中に失敗として扱いたいときはerrorを発生させれば良いわけです。

Web UI

PsychiqにはResqueやSidekiqのようなWeb UIはありません。しかし、Resque/Sidekiq互換なので、SidekiqのWeb UIを流用することができます。

以下のようなconfig.ruを用意し、

require 'sidekiq'

Sidekiq.configure_client do |config|
  config.redis = { :size => 1, url: 'redis://localhost:6379', namespace: 'psychiq' }
end

require 'sidekiq/web'
run Sidekiq::Web

Rackサーバを起動します。

$ rackup config.ru

http://localhost:9292/busyを開くと以下のような画面が見えます。

f:id:nitro_idiot:20160118174428p:plain

左上のロゴが「Sidekiq」なので不思議な感じがしますが、Psychiq用のRedisキューを読み込んで情報を表示しています。

ここから実行中のプロセスやジョブを一覧したり、失敗したジョブの手動リトライなどができます。

おわりに

以上、Psychiqを紹介しました。いつも通りGitHubで公開しています。ライセンスはLLGPLです。

そういえば、今月末はLisp Meetupがありますね。

lisp.connpass.com

今回のテーマはCommon Lispです。特にPsychiqの話をする予定はありません。

takagiさんの「Common Lisp Scriptの話」が気になります。フロントエンドもCommon Lispで書ける未来も近いんでしょうか。

どうぞ奮ってご参加ください。

*1:現在Quicklisp Alpha distには含まれています。1月のリリースには含まれる予定です。