RedisをErlangから触ってみた

前回はRubyからRedisに値を保存したり参照したりしてみました。また、前前回はRedisをコンパイルして起動し、添付のクライアントで値を保存したり参照したりしてみました。

多機能、高速かつデータの永続化が可能なキーバリューストアRedisをさわってみた
RedisをRubyから触ってみた

今回はErlangから同様の事をやってみようと思います。
ErlangのRedisライブラリは幾つかあるようですが、とりあえずは最もシンプルなeredisを使ってみようと思います。

wooga/eredis

コンパイル

$ git clone https://github.com/wooga/eredis.git
$ cd eredis
$./rebar compile

起動
Erlangノードを起動してサーバを起動します。

$ erl -pa ebin
{ok, Pid} = eredis:start_link().
{ok,<0.33.0>}

ホストとポートを指定する場合

{ok, Pid} = eredis:start_link("127.0.0.1", 6379).
{ok,<0.33.0>}

コマンド操作
操作は至ってシンプルで"eredis:q/2"の2番目の引数に[コマンド, 引数1, 引数2, ...]のようなリストを渡します。コマンドの所にはredisのコマンドをそのまま書けばいいようです。

redisのコマンド

ちなみに[A, B, C] [A | [B, C] ] と同じなので、どちらでもよいです。後者はコマンドと引数が離れてて見やすいかもしれませんが、まあ好みですねw

あと、キーもバリューもアトムが使えますが、アトムはメモリのGCによる解放がされないとの事なので実際のアプリケーションでキーにアトムを使う事は少ないのではないかと思います。なので以下の例ではキー、バリューともにバイナリまたは文字列を使っています。

値の保存と参照
キーを指定して値を保存

eredis:q(Pid, ["set" | [<<"foo">>, <<"bar">>]]).
{ok,<<"OK">>}

キーを指定して値を取得

eredis:q(Pid, ["get" | [<<"foo">>]]).           
{ok,<<"bar">>}

もう一つ、キー<<"hoge">>に対して値<<"tara">>を保存

eredis:q(Pid, ["set" | [<<"hoge">>, <<"tara">>]]).
{ok,<<"OK">>}

キー<<"foo">>と<<"hoge">>の値をいっぺんに取得

eredis:q(Pid, ["mget" | [<<"foo">>, <<"hoge">>]]). 
{ok,[<<"bar">>,<<"tara">>]}

リスト操作
リストの末尾に値を保存

eredis:q(Pid, [<<"rpush">> | [<<"foo_list">>, <<"a">>]]).
{ok,<<"1">>}

同様にあと4つの値を保存

> eredis:q(Pid, [<<"rpush">> | [<<"foo_list">>, <<"b">>]]).
> eredis:q(Pid, [<<"rpush">> | [<<"foo_list">>, <<"c">>]]).
> eredis:q(Pid, [<<"rpush">> | [<<"foo_list">>, <<"d">>]]).
> eredis:q(Pid, [<<"rpush">> | [<<"foo_list">>, <<"e">>]]).

0番目から4番目までの5つの要素を取得

eredis:q(Pid, [<<"lrange">> | [<<"foo_list">>, 0, 4]]).
{ok,[<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>]}

0番目から100番目まで取得(実際には存在する分だけ取得する)

> eredis:q(Pid, [<<"lrange">> | [<<"foo_list">>, 0, 100]]).
{ok,[<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>]}

リストの末尾の3要素を取得

eredis:q(Pid, [<<"lrange">> | [<<"foo_list">>, -3, -1]]).
{ok,[<<"c">>,<<"d">>,<<"e">>]}

値の削除
キーを指定して値を削除

eredis:q(Pid, ["del" | [<<"hoge">>]]).             
{ok,<<"1">>}

ハッシュ操作
ハッシュ操作では通常のハッシュでキーといっているものを「フィールド」と表現しているようです。下の例では数値をキーにしてみましたが、もちろんバイナリ等でもかまいません。

キーに対して、フィールドと値の組を保存する。

eredis:q(Pid, ["hset" | [<<"members">>, 1, <<"tama">>]]).         
{ok,<<"1">>}

同様の手順であと2つほど保存する

eredis:q(Pid, ["hset" | [<<"members">>, 2, <<"ume">>]]). 
{ok,<<"0">>}
eredis:q(Pid, ["hset" | [<<"members">>, 3, <<"dora">>]]).
{ok,<<"1">>}

キーとフィールドを指定して値を参照する

eredis:q(Pid, ["hget" | [<<"members">>, 1]]).    
{ok,<<"tama">>}

eredis:q(Pid, ["hget" | [<<"members">>, 2]]).
{ok,<<"ume">>}

eredis:q(Pid, ["hget" | [<<"members">>, 3]]).
{ok,<<"dora">>}

複数のキーを指定して、複数の値をいっぺんに参照する

eredis:q(Pid, ["hmget" | [<<"members">>, 1 , 2 ,3]]).
{ok,[<<"tama">>,<<"ume">>,<<"dora">>]}

フィールドを指定して、ハッシュ表からキーを削除する

eredis:q(Pid, ["hdel" | [<<"members">>, 1]]).        
{ok,<<"1">>}

削除した値を参照してみる

eredis:q(Pid, ["hget" | [<<"members">>, 1]]).        
{ok,undefined}

100万件保存テスト
100万件のデータの保存にかかる時間や、そこから指定件数分取得するのにかかる時間を調べてみました。

今回はちょっとしたテストなので今居るeredisのルートディレクトリに以下のようなテストプログラムを作りました。ファイル名は"redis_test.erl"です。

-module(redis_test).

-define(KEY, <<"sample">>).

-export([push_to_list/1, parallel_push_to_list/2, get_from_list/2, delete/0]).

%%--------------------------------------------------------------------
%% @doc Count件数のデータをリストに追加し、最終的な件数と経過時間を表示する.
%%--------------------------------------------------------------------
push_to_list(Count) ->
    {ok, Pid} = eredis:start_link(),

    StartTime = get_seconds(),
    set_value_to_list(Pid, Count, ?KEY),
    EndTime = get_seconds(),

    PassedSeconds = EndTime - StartTime,
    {ok, LenBin} = eredis:q(Pid, ["llen", ?KEY]),

    io:format("Length     : ~p~n", [binary_to_list(LenBin)]),
    io:format("Passed Time: ~w sec~n", [PassedSeconds]).

%%--------------------------------------------------------------------
%% @doc ProcessCount個数のプロセスから並列にCount件数のデータをリストに追加し、最終的な件数と経過時間を表示する.
%%--------------------------------------------------------------------
parallel_push_to_list(ProcessCount, Count) ->
    {ok, Pid} = eredis:start_link(),    

    StartTime = get_seconds(),
    parallel_push(ProcessCount, Count, Pid),
    collect_loop(ProcessCount),
    EndTime = get_seconds(),

    PassedSeconds = EndTime - StartTime,
    io:format("Passed Time: ~w sec~n", [PassedSeconds]).


%%--------------------------------------------------------------------
%% @doc 指定した範囲の値を取得して経過時間を表示する.
%%--------------------------------------------------------------------
get_from_list(SIndex, EIndex) ->
    {ok, Pid} = eredis:start_link(),

    StartTime = get_seconds(),
    {ok, List} = eredis:q(Pid, ["lrange" | [?KEY, SIndex, EIndex] ]),
    EndTime = get_seconds(),

    PassedSeconds = EndTime - StartTime,
    io:format("Length     : ~w~n", [length(List)]),
    io:format("Passed Time: ~w sec~n", [PassedSeconds]).
    
%%--------------------------------------------------------------------
%% @doc リストを削除する.
%%--------------------------------------------------------------------
delete() ->
    {ok, Pid} = eredis:start_link(),
    eredis:q(Pid, ["del", ?KEY]).

%%--------------------------------------------------------------------
%% @private
%%
%% @doc Count件数のデータをリストに追加する.
%%--------------------------------------------------------------------
set_value_to_list(_Pid, 0, _Key) -> ok;
set_value_to_list(Pid, Count, Key) ->
    Val = lists:flatten(io_lib:format("this is value of ~w", [Count])),
    {ok, _} = eredis:q(Pid, ["rpush" | [Key, Val] ]),
    set_value_to_list(Pid, Count - 1, Key).
    
%%--------------------------------------------------------------------
%% @private
%%
%% @doc 現在時間を小数点以下を含む秒単位で返す.
%%--------------------------------------------------------------------
get_seconds() ->
    {Mega, Sec, Micro} = now(),
    ((Mega * 1000000 + Sec) * 1000000 + Micro) / 1000000.
    
%%--------------------------------------------------------------------
%% @private
%%
%% @doc Redisに値を保存するプロセスをProcessCount数だけ生成する.
%%--------------------------------------------------------------------
parallel_push(0, _Count, _Pid) -> ok;
parallel_push(ProcessCount, Count, Pid) ->
    ParentPid = self(),
    spawn(fun() ->
                  Key = lists:flatten(io_lib:format("key_~w", [ProcessCount])),
                  set_value_to_list(Pid, Count, Key),
                  ParentPid ! ok
          end),
    parallel_push(ProcessCount - 1, Count, Pid).

%%--------------------------------------------------------------------
%% @private
%%
%% @doc Redisに値を保存する各プロセスからの結果を収集する.
%%--------------------------------------------------------------------
collect_loop(0) -> ok;
collect_loop(Count) ->
    receive
        ok -> collect_loop(Count - 1)
    after 5000 -> {error, timeout}
    end.

Erlangノードを起動してコンパイルします。

c(redis_test).
{ok,redis_test}

1万件追加してみます。

redis_test:push_to_list(10000).
Length     : "10000"
Passed Time: 0.8639841079711914 sec

0.86秒で完了しました。

調子に乗って100万件追加してみます。

redis_test:push_to_list(1000000).
Length     : "1010000"
Passed Time: 88.41989707946777 sec
ok

88秒ほどかかりました。1万件のほぼ100倍ってことは件数が増えても値を追加する時間に影響しないのですね。。

そんじゃ、末尾の1000件を取得してみます。

redis_test:get_from_list(-1000, -1).       
Length     : 1000
Passed Time: 0.006865024566650391 sec
ok

早いw

それは真ん中あたりの1000件を取得すると?

redis_test:get_from_list(-501000, -500001).
Length     : 1000
Passed Time: 0.012251138687133789 sec
ok

やはり早いですが、同じ1000件でも末尾の1000件と比べると2倍の時間がかかってます。これは何回かやってみましたが、やはり末尾に比べて真ん中あたりは2倍の時間がかかってました。もっともこれはデータが100万件の場合で、少なくなるほどこの差も小さいようですが。

ちなみにリストの先頭を取得すると?

redis_test:get_from_list(0, 999).          
Length     : 1000
Passed Time: 0.005799055099487305 sec
ok

末尾と同程度ですね。つまり両端は早くて中程は少し時間がかかるようです。まあ今回の実験に限って言うなら無視出来る範囲ですけどw

並列書き込みテスト
今度はより現実的に、複数のプロセスから同時に書き込みを行ってみます。

redis_test:parallel_push_to_list(10000, 100).
Passed Time: 5.101668119430542 sec
ok

1万個のプロセスそれぞれが100件、同時に書き込みます。合計100万個のデータが書き込まれます。

もう一回同じ事をやってみます。

redis_test:parallel_push_to_list(10000, 100).   
Passed Time: 0.29964399337768555 sec
ok

はやっwww
これは幾ら何でも早すぎ...って思ってデータの件数を調べてみると処理終了後に徐々に増えていってます。これってRedisは書き込みが非同期ってことなのかな?まだRedisをちゃんと理解出来てないので良く解りませんが...

eredisの動作について
今回使用したライブラリeredisは1プロセスでクライアントからのリクエストをノンブロッキングに処理するようです。READMEにも書いてありますし、ソースをざっと見た所、クライアントからリクエストがあると、

  1. Redisにリクエストを投げる。
  2. 返信先プロセス番号をキューの末尾に追加する。
  3. すぐさま次のリクエストの待機にもどる。
  4. Redisから結果が帰ってくると今度はキューの先頭にあるプロセス番号を取り出し、そのプロセスに向かって結果を返す。

という流れで処理するようです。これってRedis自体もはひとつのソケットで応答を返す前に次のリクエストを受け付けられるってことなんでしょうかね...どうもRedisをきちんと理解していないのが立ちはだかります。

Redis自体をちゃんと勉強しなければ...

[追記]
ここ(http://redis.shibu.jp/developer/pipelining.html)に書いてありました。やはりRedis自体、ノンブロッキングに次々とリクエストを受け付けられるようです(こういうのをパイプライニングというらしい...)。
従って、eredisの1プロセスで複数クライアントのリクエストを同時に捌けるという事ですね。ただ、応答に時間のかかるリクエストが含まれると、その後のリクエストへの応答はそれが終わるまで待たされる事になりますけど...