Riak 2.0のPlumtreeを読む¶
昨年のRiakアドベントカレンダーでしのはらさんが、 Riak 2.0 : クラスタ全体のデータ共有を効率 という記事を書かれています。
これによると、Riak 2.0ではgossip protocolに加えて Plumtreeという論文 をもとに実装されるツリー状の経路を通って通信するプロトコルが追加される とのことです。
紹介されている スライド を読んでいて興味が湧いてきたのでriak_coreを読んで みることにします。
ちなみにlogを見ると、2013年8月1日にこれらの変更が最初に入ったようです。
警告
例によって間違っている可能性は大いにあります。ご指摘をお待ちしております。
tl; dr;¶
Riak 2.0からgossipプロトコルが効率良くなるよ
10台以上から有効化されるよ01/23追記: 間違いでした!常にplumtreeが使われていて、gossipを使うのは最初のtreeを作るところだけでした。
用語定義¶
- eager
すぐに送るノード
- lazy
あとで送るノードリスト
- outstanding
lazyで送ったけどまだackが返ってきていないメッセージ
- plumtree
今回導入されたツリー状の経路を通る通信プロトコルのこと。元論文では Epidemic Broadcast Treesと言ってるし、コード中にどこにもplumtreeと書 かれてないけど。(1/14追記: 元論文で普通にplumtreeと書かれていました…あれー)
riak_core_broadcast.erl¶
eagerでgrepすると、このファイルが引っかかります。クラスタ全体に broadcastをする機能が実装されている部分です。
この中に init_peers(Members) という関数があります。この関数でどの通信 プロトコルを使うか決定しています。
- ノードが1つだけの場合
自分自身だけだから特になし
- ノードが2つの場合
お互い送りあうだけ
- ノードが2から4の場合
お互いに接続(メッシュで接続)
- ノードが5から9の場合
riak_core_gossip.erl で定義されているgossipプロトコルを使うgossipで使っているtree構造を初期ツリーとして使う- ノードが10以上の場合
plumtreeを使う
すなわち、2.0から入ったツリー上の径路を使うには10ノード以上必要というこ とですね。それ以下であれば当初のツリー構造はgossipで十分ということでしょ う。(この10という数字がどこからきているのかは気になるところですが)
なお、init_peers/1は handle_cast({ring_update, Ring} ...) でも呼ばれ ており、ringが更新されるたびに通信プロトコルを決め直すことが分かります。
init_peers/1の中で riak_core_util:build_tree/3 が呼ばれており、経路を作る実態はこの中のようです。
riak_core_util:build_tree/3¶
build_tree/3では、渡されたリストからN分木を構築します。Nはメッシュ の場合は1、gossipの場合は2, plumtreeの場合はround(math:log(N) + 1)となります。
ちなみにinit_parse/1からbuild_tree/3が呼び出されるときの第三引数 optionsに[cycles]が入っています。このcycleが入っている場合は、 leafから上のノードへのリンクを持つ、つまりdouble-linkedになります。 (たぶん…)
Flat = [1,
11, 12,
111, 112, 121, 122,
1111, 1112, 1121, 1122, 1211, 1212, 1221, 1222],
というノードのリストが
CTree = [{1, [ 11, 12]},
{11, [ 111, 112]},
{12, [ 121, 122]},
{111, [1111, 1112]},
{112, [1121, 1122]},
{121, [1211, 1212]},
{122, [1221, 1222]},
{1111, [ 1, 11]},
{1112, [ 12, 111]},
{1121, [ 112, 121]},
{1122, [ 122, 1111]},
{1211, [1112, 1121]},
{1212, [1122, 1211]},
{1221, [1212, 1221]},
{1222, [1222, 1]}],
こうなるわけです。
初期Treeができたので、broadcastに戻ります。
実際にあるノードがbroadcastするときは、broadcast/2を呼びます。
broadcast(Broadcast, Mod) ->
{MessageId, Payload} = Mod:broadcast_data(Broadcast),
gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).
Broadcastはデータを、Modはmodule()です。このModはringにいるすべてのノードで実行されます。 riak_core_broadcast_handlerで定義されています。
ちょっとこっちを見てみましょう。
riak_core_broadcast_handler.erl¶
ここには
- broadcast_data
message idとpayloadのtupleを返します
- merge
messageをローカルで受け取ります。すでに受け取っていた場合はfalseを返します
- is_stale
メッセージをすでに受け取っていればtrueを返します
- graft
与えられたメッセージidからメッセージを返します。このメッセージはすでに送られている可能性があります。その場合はstaleが返ります
- exchange
ローカルと与えられたノードとの間でhandlerを交換するトリガーです。exchangeではローカルとリモートとでそれぞれ自分自身にはないメッセージを交換します。
の5つのcallbackが定義されています。これを頭に入れておくと、このあとよく分かります。
ちなみにexchangeで責任を負うのはすでに受け取っているメッセージに関してだけで、送信されてまだ誰も受け取っていない、途中のメッセージは次のexchangeが責任を持ちます。
再びbroadcast.erl¶
broadcastでは、まずはこっちが呼ばれます。
handle_cast({broadcast, MessageId, Message, Mod}, State) ->
State1 = eager_push(MessageId, Message, Mod, State),
State2 = schedule_lazy_push(MessageId, Mod, State1),
{noreply, State2};
eager_pushは、eagerリストに対してメッセージを送ります。 schedule_lazh_pushはlazyリストに対してあとでメッセージを送ります。
eager_pushはeagerリストを見て、対象のノードにメッセージを送ります。 このあたりですね。
%% 一番最初は自分自身から
eager_push(MessageId, Message, Mod, State) ->
eager_push(MessageId, Message, Mod, 0, node(), node(), State).
%% あとはeagerリストに基づいて送っていく
eager_push(MessageId, Message, Mod, Round, Root, From, State) ->
Peers = eager_peers(Root, From, State),
send({broadcast, MessageId, Message, Mod, Round, Root, node()}, Peers),
State.
broadcastを受け取ったら¶
受け取っていない場合¶
handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
Valid = Mod:merge(MessageId, Message),
State1 = handle_broadcast(Valid, MessageId, Message, Mod, Round, Root, From, State),
{noreply, State1};
まず、上で説明したmerge/2でローカルに取り込みます。まだ受け取っていない 場合、add_eager/3で送信元をeagerリストに追加し、Roundを増やしてさらにeagerに送っていきます。
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
State1 = add_eager(From, Root, State),
State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1),
schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2).
受け取っていた場合¶
しかし、すでに受け取っていた場合はlazyリストに入れ、pruneを送り返します。
handle_broadcast(false, _MessageId, _Message, _Mod, _Round, Root, From, State) -> %% stale msg
State1 = add_lazy(From, Root, State),
send({prune, Root, node()}, From),
State1;
pruneを受け取ったらlazyリストに入れます。
handle_cast({prune, Root, From}, State) ->
State1 = add_lazy(From, Root, State),
{noreply, State1};
これでスライドの35ページの図になるわけです。
eagerリストとlazyリスト¶
add_eager/3とadd_lazy/3でeagerリストとlazyリストに追加します。 eagerを追加した場合、lazyから抜きます。また、lazyに追加した場合に はeagerから抜きます。
add_eager(From, Root, State) ->
update_peers(From, Root, fun ordsets:add_element/2, fun ordsets:del_element/2, State).
add_lazy(From, Root, State) ->
update_peers(From, Root, fun ordsets:del_element/2, fun ordsets:add_element/2, State).
障害時は?¶
途中経路で障害が起こったときは、lazyリストを使います。
lazyリスト¶
lazyは障害に備えるために管理しているノードです。schedule_lazy_tick/0 で メッセージが最初に送られてから1000msec後にlazyを処理します。
schedule_lazy_tick() ->
schedule_tick(lazy_tick, broadcast_lazy_timer, 1000).
lazyはいろいろ経由したあとにsend_lazy/4で i_have メッセージをlazyリストに対して送ります。
send_lazy(MessageId, Mod, Round, Root, Peer) ->
send({i_have, MessageId, Mod, Round, Root, node()}, Peer).
i_haveメッセージ¶
i_haveを受け取ったら、すでに受け取っているかどうかをチェックします。
すでに受け取っている(stale)場合は特になにもしません。これにより、通常時はなにも無いわけです。
そうではない場合、つまり見たことがないメッセージをi_haveで受け取った場合、graftメッセージが送信元に送られます。
handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have
%% TODO: don't graft immediately
send({graft, MessageId, Mod, Round, Root, node()}, From),
add_eager(From, Root, State).
graft¶
graftメッセージを受け取るとgraftを試みます。
handle_cast({graft, MessageId, Mod, Round, Root, From}, State) ->
Result = Mod:graft(MessageId),
State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State),
{noreply, State1};
すでに受け取っていた場合はack_outstandingを返します。
handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
ack_outstanding(MessageId, Mod, Round, Root, From, State);
ackを受け取ったらoutstandingからメッセージを削除します。
受け取っていないメッセージがgraftで送られてきた場合、なにかがおかしいよ うです。もう一度最初からメッセージを送ります。
handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
%% we don't ack outstanding here because the broadcast may fail to be delivered
%% instead we will allow the i_have to be sent once more and let the subsequent
%% ignore serve as the ack.
State1 = add_eager(From, Root, State),
send({broadcast, MessageId, Message, Mod, Round, Root, node()}, From),
State1;
outstandingがあふれる?¶
outstandingには送られてないメッセージが入っているので、そのうちあふれて しまうのでは、と思うのですが、
ring_updateが来た時にneighbors_down/2が呼ばれ、この中でeagerとlazyから 落ちたノードをはずし、outstandingも消します。だからあふれる心配は無いわ けですね。
障害から復帰¶
障害が起きていたノードが復帰した場合、ring_updateが走り、最初の init_peers/1 からやり直されます。
まとめ¶
Riak 2.0で入るツリー状の経路を通ってブロードキャストするプロトコルについてriak_coreを読んでみました。
例えば1000台とかになるとgossipじゃなくてplumtreeの方がいいんでしょうね。 (元論文には障害率とか詳しく載ってます)
しかしerlangは読みやすいなぁ。
Comments
comments powered by Disqus