メモ: Redisをジョブキューとして使う

Redisについて
Redisはいわゆるオンメモリで動作して永続化もしてくれる高速なキーバリューストアですが、ノティフィケーションのような機能ももってます。
実はジョブキューのようなものは無いかなと最初はRabbitMQを調べていたのですが、そういやRedisにそういう使い方できそうなコマンドがあったような。と思ってみてみたらありました。

コマンド
該当のコマンドはPUBLISHとSUBSCRIBEです。

  1. SUBSCRIBEは現在のコネクションで特定のキーワードの通知が来るのを待機開始するコマンド
  2. PUBLISHは通知を送信するコマンド

です。

具体的には

SUBSCRIBE foo

とするとキーワードfooで通知を待ち受け、

PUBLISH foo hoge

とするとhogeというメッセージとともにSUBSCRIBEしているコネクションにメッセージを送ります。

実際にコマンドラインからやってみましょう。Redisに付属するクライアントredis-cliを使います。srcディレクトリのナカニあります。
Redisはローカルホストの標準ポート(6379)で動作しているものとします。

まずは待ち受け用のクライアントを起動します。

./redis-cli

"foo"というキーワードでSUBSCRIBEコマンドを発行します。

SUBSCRIBE foo

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "foo"
3) (integer) 1

どうやら待ち受けがはじまりました。

続いて通知送信のためのクライアントを起動します。同じくredis-cli

./redis-cli

PUBLISHコマンドで通知を送信します。

PUBLISH foo hoge

すぐさま最初のSUBSCRIBEしたクライアントで通知を受け取りました。

SUBSCRIBE foo
Reading messages... (press Ctrl-C to quit)
...

1) "message"
2) "foo"
3) "hoge"

どうやらうまくいきました。
それではErlangのRedisクライアントライブラリeredisで試してみます。

eredisでRedisのPub/Subコマンドを試す
さて、eredisを使う場合は普通はeredis:q(Conn, [コマンド | 引数リスト]) などとしますが、待ち受けようにSUBSCRIBEする場合はこれだとエラーが出ます。
ではどうするかというと、eredis_sub_clientというのを使うのですが、この使い方の説明用とでもいうか、eredis_subというものが用意されています。名前からこれが本命かのようですが、あくまでサンプルとしてみた用が良さそうです。が、折角用意してあるので使ってみます。

eredisをmakeした上で、erlコマンドで対話モードを起動します。

erl -pa ebin

起動したらeredis_sub:sub_example() を実行します。

eredis_sub:sub_example().

この中で何をしているかというと、コードはこんな感じになってます。

sub_example() ->
    {ok, Sub} = start_link(),
    Receiver = spawn_link(fun () ->
                                  controlling_process(Sub),
                                  subscribe(Sub, [<<"foo">>]),
                                  receiver(Sub)
                          end),
    {Sub, Receiver}.
  1. 最初にstart_linkでeredis_sub_client:start_link/6を呼んでgen_serverを起動
  2. 続いて別プロセスをspawn
  3. spawnしたプロセス内でcontrolling_process/0を呼んで通知がこのspawnしたプロセスに来るように設定
  4. 続いてsubscrive/2を呼んで"foo"というキーワードの通知を待つ事をredisに知らせる
  5. 最後にreceive/1を呼んで再帰ループでメッセージを待機します。

送信はさっきのredis-cliからやってもいいのですが、折角なのでこっちもeredisから。送信側は従来通りのコマンド発行でOKです。

{ok, C} = eredis:start_link().
eredis:q(C, ["PUBLISH", "foo", "helo"]).

PUBLISHした直後に待ち受けていたノードで

received {message,<<"foo">>,<<"helo">>,<0.33.0>}

と表示されれば成功です。
実際には表示だけしてもしょうがないので、ここで受け取ったメッセージからなんらかの処理をしたりする事になると思います。例えば別にリストを用意しておいて、そこになんらかのフォーマットでジョブを入れてからPUBLISHで通知、事前にSUBSCRIBEしていたプロセスが通知を受け取ると同時にリストからジョブを取り出して処理...とかもできそうです。

あと、当然待ち受け側と送信側が別の言語で動作していてもいいですね。ジョブキューのシステムとしてはRabbitMQが有名ですが、ちょっとしたことなら案外こっちが単純で良かったりするかもしれません。