Riak 2.0のPlumtreeを読む¶
昨年のRiakアドベントカレンダーでしのはらさんが、 Riak 2.0 : クラスタ全体のデータ共有を効率 という記事を書かれています。
これによると、Riak 2.0ではgossip protocolに加えて Plumtreeという論文 をもとに実装されるツリー状の経路を通って通信するプロトコルが追加される とのことです。
紹介されている スライド を読んでいて興味が湧いてきたのでriak_coreを読んで みることにします。
ちなみにlogを見ると、2013年8月1日にこれらの変更が最初に入ったようです。
警告
例によって間違っている可能性は大いにあります。ご指摘をお待ちしております。
tl; dr;¶
- Riak 2.0からgossipプロトコルが効率良くなるよ 
- 10台以上から有効化されるよ- 01/23追記: 間違いでした!常にplumtreeが使われていて、gossipを使うのは最初のtreeを作るところだけでした。 
用語定義¶
- eager
- すぐに送るノード 
- lazy
- あとで送るノードリスト 
- outstanding
- lazyで送ったけどまだackが返ってきていないメッセージ 
- plumtree
- 今回導入されたツリー状の経路を通る通信プロトコルのこと。元論文では Epidemic Broadcast Treesと言ってるし、コード中にどこにもplumtreeと書 かれてないけど。(1/14追記: 元論文で普通にplumtreeと書かれていました…あれー) 
riak_core_broadcast.erl¶
eagerでgrepすると、このファイルが引っかかります。クラスタ全体に broadcastをする機能が実装されている部分です。
この中に init_peers(Members) という関数があります。この関数でどの通信 プロトコルを使うか決定しています。
- ノードが1つだけの場合
- 自分自身だけだから特になし 
- ノードが2つの場合
- お互い送りあうだけ 
- ノードが2から4の場合
- お互いに接続(メッシュで接続) 
- ノードが5から9の場合
- riak_core_gossip.erl で定義されているgossipプロトコルを使うgossipで使っているtree構造を初期ツリーとして使う
- ノードが10以上の場合
- plumtreeを使う 
すなわち、2.0から入ったツリー上の径路を使うには10ノード以上必要というこ とですね。それ以下であれば当初のツリー構造はgossipで十分ということでしょ う。(この10という数字がどこからきているのかは気になるところですが)
なお、init_peers/1は handle_cast({ring_update, Ring} ...) でも呼ばれ ており、ringが更新されるたびに通信プロトコルを決め直すことが分かります。
init_peers/1の中で riak_core_util:build_tree/3 が呼ばれており、経路を作る実態はこの中のようです。
riak_core_util:build_tree/3¶
build_tree/3では、渡されたリストからN分木を構築します。Nはメッシュ の場合は1、gossipの場合は2, plumtreeの場合はround(math:log(N) + 1)となります。
ちなみにinit_parse/1からbuild_tree/3が呼び出されるときの第三引数 optionsに[cycles]が入っています。このcycleが入っている場合は、 leafから上のノードへのリンクを持つ、つまりdouble-linkedになります。 (たぶん…)
Flat = [1,
        11, 12,
        111, 112, 121, 122,
        1111, 1112, 1121, 1122, 1211, 1212, 1221, 1222],
というノードのリストが
CTree = [{1,    [  11,   12]},
         {11,   [ 111,  112]},
         {12,   [ 121,  122]},
         {111,  [1111, 1112]},
         {112,  [1121, 1122]},
         {121,  [1211, 1212]},
         {122,  [1221, 1222]},
         {1111, [   1,   11]},
         {1112, [  12,  111]},
         {1121, [ 112,  121]},
         {1122, [ 122, 1111]},
         {1211, [1112, 1121]},
         {1212, [1122, 1211]},
         {1221, [1212, 1221]},
         {1222, [1222,    1]}],
こうなるわけです。
初期Treeができたので、broadcastに戻ります。
実際にあるノードがbroadcastするときは、broadcast/2を呼びます。
broadcast(Broadcast, Mod) ->
    {MessageId, Payload} = Mod:broadcast_data(Broadcast),
    gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).
Broadcastはデータを、Modはmodule()です。このModはringにいるすべてのノードで実行されます。 riak_core_broadcast_handlerで定義されています。
ちょっとこっちを見てみましょう。
riak_core_broadcast_handler.erl¶
ここには
- broadcast_data
- message idとpayloadのtupleを返します 
- merge
- messageをローカルで受け取ります。すでに受け取っていた場合はfalseを返します 
- is_stale
- メッセージをすでに受け取っていればtrueを返します 
- graft
- 与えられたメッセージidからメッセージを返します。このメッセージはすでに送られている可能性があります。その場合はstaleが返ります 
- exchange
- ローカルと与えられたノードとの間でhandlerを交換するトリガーです。exchangeではローカルとリモートとでそれぞれ自分自身にはないメッセージを交換します。 
の5つのcallbackが定義されています。これを頭に入れておくと、このあとよく分かります。
ちなみにexchangeで責任を負うのはすでに受け取っているメッセージに関してだけで、送信されてまだ誰も受け取っていない、途中のメッセージは次のexchangeが責任を持ちます。
再びbroadcast.erl¶
broadcastでは、まずはこっちが呼ばれます。
handle_cast({broadcast, MessageId, Message, Mod}, State) ->
    State1 = eager_push(MessageId, Message, Mod, State),
    State2 = schedule_lazy_push(MessageId, Mod, State1),
    {noreply, State2};
eager_pushは、eagerリストに対してメッセージを送ります。 schedule_lazh_pushはlazyリストに対してあとでメッセージを送ります。
eager_pushはeagerリストを見て、対象のノードにメッセージを送ります。 このあたりですね。
%% 一番最初は自分自身から
eager_push(MessageId, Message, Mod, State) ->
    eager_push(MessageId, Message, Mod, 0, node(), node(), State).
%% あとはeagerリストに基づいて送っていく
eager_push(MessageId, Message, Mod, Round, Root, From, State) ->
    Peers = eager_peers(Root, From, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, Peers),
    State.
broadcastを受け取ったら¶
受け取っていない場合¶
handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
    Valid = Mod:merge(MessageId, Message),
    State1 = handle_broadcast(Valid, MessageId, Message, Mod, Round, Root, From, State),
    {noreply, State1};
まず、上で説明したmerge/2でローカルに取り込みます。まだ受け取っていない 場合、add_eager/3で送信元をeagerリストに追加し、Roundを増やしてさらにeagerに送っていきます。
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
    State1 = add_eager(From, Root, State),
    State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1),
    schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2).
受け取っていた場合¶
しかし、すでに受け取っていた場合はlazyリストに入れ、pruneを送り返します。
handle_broadcast(false, _MessageId, _Message, _Mod, _Round, Root, From, State) -> %% stale msg
    State1 = add_lazy(From, Root, State),
    send({prune, Root, node()}, From),
    State1;
pruneを受け取ったらlazyリストに入れます。
handle_cast({prune, Root, From}, State) ->
    State1 = add_lazy(From, Root, State),
    {noreply, State1};
これでスライドの35ページの図になるわけです。
eagerリストとlazyリスト¶
add_eager/3とadd_lazy/3でeagerリストとlazyリストに追加します。 eagerを追加した場合、lazyから抜きます。また、lazyに追加した場合に はeagerから抜きます。
add_eager(From, Root, State) ->
    update_peers(From, Root, fun ordsets:add_element/2, fun ordsets:del_element/2, State).
add_lazy(From, Root, State) ->
    update_peers(From, Root, fun ordsets:del_element/2, fun ordsets:add_element/2, State).
障害時は?¶
途中経路で障害が起こったときは、lazyリストを使います。
lazyリスト¶
lazyは障害に備えるために管理しているノードです。schedule_lazy_tick/0 で メッセージが最初に送られてから1000msec後にlazyを処理します。
schedule_lazy_tick() ->
    schedule_tick(lazy_tick, broadcast_lazy_timer, 1000).
lazyはいろいろ経由したあとにsend_lazy/4で i_have メッセージをlazyリストに対して送ります。
send_lazy(MessageId, Mod, Round, Root, Peer) ->
    send({i_have, MessageId, Mod, Round, Root, node()}, Peer).
i_haveメッセージ¶
i_haveを受け取ったら、すでに受け取っているかどうかをチェックします。
すでに受け取っている(stale)場合は特になにもしません。これにより、通常時はなにも無いわけです。
そうではない場合、つまり見たことがないメッセージをi_haveで受け取った場合、graftメッセージが送信元に送られます。
handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have
    %% TODO: don't graft immediately
    send({graft, MessageId, Mod, Round, Root, node()}, From),
    add_eager(From, Root, State).
graft¶
graftメッセージを受け取るとgraftを試みます。
handle_cast({graft, MessageId, Mod, Round, Root, From}, State) ->
    Result = Mod:graft(MessageId),
    State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State),
    {noreply, State1};
すでに受け取っていた場合はack_outstandingを返します。
handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
    ack_outstanding(MessageId, Mod, Round, Root, From, State);
ackを受け取ったらoutstandingからメッセージを削除します。
受け取っていないメッセージがgraftで送られてきた場合、なにかがおかしいよ うです。もう一度最初からメッセージを送ります。
handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
    %% we don't ack outstanding here because the broadcast may fail to be delivered
    %% instead we will allow the i_have to be sent once more and let the subsequent
    %% ignore serve as the ack.
    State1 = add_eager(From, Root, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, From),
    State1;
outstandingがあふれる?¶
outstandingには送られてないメッセージが入っているので、そのうちあふれて しまうのでは、と思うのですが、
ring_updateが来た時にneighbors_down/2が呼ばれ、この中でeagerとlazyから 落ちたノードをはずし、outstandingも消します。だからあふれる心配は無いわ けですね。
障害から復帰¶
障害が起きていたノードが復帰した場合、ring_updateが走り、最初の init_peers/1 からやり直されます。
まとめ¶
Riak 2.0で入るツリー状の経路を通ってブロードキャストするプロトコルについてriak_coreを読んでみました。
例えば1000台とかになるとgossipじゃなくてplumtreeの方がいいんでしょうね。 (元論文には障害率とか詳しく載ってます)
しかしerlangは読みやすいなぁ。
Comments
comments powered by Disqus