riak_coreのcommandを実装してみる

すでにかなり時間がたってしまいましたが、 前回 ではriak_coreを使 うアプリケーションの雛形を作成するところまでを試しました。

今回は、実際にriak_coreの上でアプリケーションを実装してみます。

riak_coreを使うアプリケーションとは、riak_core上に存在するvnode群に対して

  1. コマンドを送り、

  2. 何かしらの処理を行い、

  3. (必要であれば)結果を返す

という一連の操作を行うロジックを実装したものです。ノードのjoinや stabilizeなどの面倒な部分はすべてriak_coreが面倒を見てくれるため、アプ リケーション実装者はこの3つだけを考えればいいわけです。

注釈

という感じで断言してますが、この定義が正しいかどうか、あるい はこの記事全体が正しいかどうかは自信がないのでツッコミ大歓迎 です。

1. command

まずはコマンドを送る部分です。 riak_core_vnode_master に以下の3種類のコマンドが定義されています。

  • command

  • sync_command

  • sync_spawn_command

command は通常使うもので、非同期で指定のコマンドを送ります。

sync_command はsyncが付いている通り、返答が返ってくるまでブロックします。

sync_spawn_commandsync_command 同様で返答が返ってくるまでブロックしますが、MLの 一連のスレッド を見ると実際には使われていないようです。

が、rebar_riak_coreを使うとサンプルのpingで使われていますね。 ソース

基本的にはこんな感じで使います。

dping(Address) ->
   %% ランダムに接続ノードを策定
   DocIdx = riak_core_util:chash_key({<<"dping">>, term_to_binary(now())}),

   %% PrefListからIndexNodeを導き出す
   PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, {{appid}}),
   [{IndexNode, _Type}] = PrefList,

   io:format("message to ~p from ~p~n", [Address, self()]),

   %% コマンド実行。第二引数のメッセージ、第三引数はdpingアプリのvnode_master
   Message = {dping, Address},  %% メッセージ作成
   {Msg, Partition} = riak_core_vnode_master:sync_command(IndexNode, Message, dping_vnode_master),
   io:format("~p, ~p", [Msg, Partition]).

Preflist

接続する先のノードをPrefList、すなわち Prefered Listと呼びます。上記例では、ランダムに接続ノードを決定しています。

riak_core_apl にこのPrefListを得る関数があります。このあたりです。

  • get_apl/3 get_apl(binary(), n_val(), atom()) -> preflist().

  • get_apl/4 get_apl(binary(), n_val(), ring(), [node()]) -> preflist().

  • get_primary_apl/3 get_primary_apl(binary(), n_val(), atom()) -> preflist2().

  • get_primary_apl/4 get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist2().

primaryは、get_aplと同じですが、Primaryだけが返されます。

PrefListについては今調査中ですので、次回の宿題とさせてください。

2. なにかしらの処理

次は処理です。

前回作成した中に dping_vnode.erl というファイルがあり、そこに以下の ようなコードがあるはずです。

%% Sample command: respond to a ping
handle_command(ping, _Sender, State) ->
     {reply, {pong, State#state.partition}, State};

この handle_command でコマンドのメッセージを受け取り、処理します。例えば、dpingメッセージを受け取る場合はこんな感じのコードを追加します。

handle_command({dping, Address}, _Sender, State) ->
    io:format("dping recieved ~p ~p~n", [Address, self()]),
    {reply, {success, State#state.partition}, State};

こう書いておいて、 dping console でconsoleにつなげ、dpingと打つとこのように返ってきます。

(dping2@127.0.0.1)17> dping("hoge").
message to "hoge" from <0.235.0>
dping recieved "hoge" <0.266.0>
success,662242929415565384811044689824565743281594433536

この場合、打ったnodeは <0.235.0>、受け取ったnodeは<0.266.0>ですね。

なお、dpingコマンドは user_default.erl に書いておくとerlang shellが自 動的に読み込んでくれるのでモジュール名を打たなくて済むので便利です。今回はこんな感じですね。

dping(Address) ->
    dping:dping(Address).

3. 返答

返答は普通です。

{reply, {success, State#state.partition}, State};

としています。 {reply or noreply, メッセージ, State}です。一つ目は replynoreply で、noreplyにすると、送信元nodeにメッセージを返し ません。

なお、実行側には

{Msg, Partition} = riak_core_vnode_master:sync_command(IndexNode, Message, dping_vnode_master),

と書いたように、真ん中のメッセージ部分だけが返ってきます。(Stateについては調査中)

まとめ

  1. riak_coreを使うアプリケーションでは通信にriak_core_vnode_master:command/3かsync_command/3を使う

  2. 処理内容は<アプリ名>_vnode.erlでhandle_command/3に書く

次回はPrefListについて調べます。

Comments

comments powered by Disqus