このエントリーをはてなブックマークに追加

riak_coreのcommandを実装してみる

すでにかなり時間がたってしまいましたが、前回ではriak_coreを使うアプリケーションの雛形を作成するところまでを試しました。

今回は、実際にriak_coreの上でアプリケーションを実装してみます。

riak_coreを使うアプリケーションとは、riak_core上に存在するvnode群に対して

  1. コマンドを送り、
  2. 何かしらの処理を行い、
  3. (必要であれば)結果を返す

という一連の操作を行うロジックを実装したものです。ノードのjoinやstabilizeなどの面倒な部分はすべてriak_coreが面倒を見てくれるため、アプリケーション実装者はこの3つだけを考えればいいわけです。

注釈

という感じで断言してますが、この定義が正しいかどうか、あるいはこの記事全体が正しいかどうかは自信がないのでツッコミ大歓迎です。

1. command

まずはコマンドを送る部分です。riak_core_vnode_masterに以下の3種類のコマンドが定義されています。

  • command
  • sync_command
  • sync_spawn_command

commandは通常使うもので、非同期で指定のコマンドを送ります。

sync_commandはsyncが付いている通り、返答が返ってくるまでブロックします。

sync_spawn_commandsync_command同様で返答が返ってくるまでブロックしますが、MLの一連のスレッドを見ると実際には使われていないようです。

が、rebar_riak_coreを使うとサンプルのpingで使われていますね。ソース

基本的にはこんな感じで使います。

dping(Address) ->
   %% ランダムに接続ノードを策定
   DocIdx = riak_core_util:chash_key({<<"dping">>, term_to_binary(now())}),

   %% PrefListからIndexNodeを導き出す
   PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, {{appid}}),
   [{IndexNode, _Type}] = PrefList,

   io:format("message to ~p from ~p~n", [Address, self()]),

   %% コマンド実行。第二引数のメッセージ、第三引数はdpingアプリのvnode_master
   Message = {dping, Address},  %% メッセージ作成
   {Msg, Partition} = riak_core_vnode_master:sync_command(IndexNode, Message, dping_vnode_master),
   io:format("~p, ~p", [Msg, Partition]).

Preflist

接続する先のノードをPrefList、すなわち Prefered Listと呼びます。上記例では、ランダムに接続ノードを決定しています。

riak_core_aplにこのPrefListを得る関数があります。このあたりです。

  • get_apl/3 get_apl(binary(), n_val(), atom()) -> preflist().
  • get_apl/4 get_apl(binary(), n_val(), ring(), [node()]) -> preflist().
  • get_primary_apl/3 get_primary_apl(binary(), n_val(), atom()) -> preflist2().
  • get_primary_apl/4 get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist2().

primaryは、get_aplと同じですが、Primaryだけが返されます。

PrefListについては今調査中ですので、次回の宿題とさせてください。

2. なにかしらの処理

次は処理です。

前回作成した中にdping_vnode.erlというファイルがあり、そこに以下のようなコードがあるはずです。

%% Sample command: respond to a ping
handle_command(ping, _Sender, State) ->
     {reply, {pong, State#state.partition}, State};

このhandle_commandでコマンドのメッセージを受け取り、処理します。例えば、dpingメッセージを受け取る場合はこんな感じのコードを追加します。

handle_command({dping, Address}, _Sender, State) ->
    io:format("dping recieved ~p ~p~n", [Address, self()]),
    {reply, {success, State#state.partition}, State};

こう書いておいて、dping consoleでconsoleにつなげ、dpingと打つとこのように返ってきます。

(dping2@127.0.0.1)17> dping("hoge").
message to "hoge" from <0.235.0>
dping recieved "hoge" <0.266.0>
success,662242929415565384811044689824565743281594433536

この場合、打ったnodeは <0.235.0>、受け取ったnodeは<0.266.0>ですね。

なお、dpingコマンドはuser_default.erlに書いておくとerlang shellが自動的に読み込んでくれるのでモジュール名を打たなくて済むので便利です。今回はこんな感じですね。

dping(Address) ->
    dping:dping(Address).

3. 返答

返答は普通です。

{reply, {success, State#state.partition}, State};

としています。 {reply or noreply, メッセージ, State}です。一つ目はreplynoreplyで、noreplyにすると、送信元nodeにメッセージを返しません。

なお、実行側には

{Msg, Partition} = riak_core_vnode_master:sync_command(IndexNode, Message, dping_vnode_master),

と書いたように、真ん中のメッセージ部分だけが返ってきます。(Stateについては調査中)

まとめ

  1. riak_coreを使うアプリケーションでは通信にriak_core_vnode_master:command/3かsync_command/3を使う
  2. 処理内容は<アプリ名>_vnode.erlでhandle_command/3に書く

次回はPrefListについて調べます。