このエントリーをはてなブックマークに追加

Riak 2.0のPlumtreeを読む

昨年のRiakアドベントカレンダーでしのはらさんが、Riak 2.0 : クラスタ全体のデータ共有を効率という記事を書かれています。

これによると、Riak 2.0ではgossip protocolに加えてPlumtreeという論文をもとに実装されるツリー状の経路を通って通信するプロトコルが追加されるとのことです。

紹介されているスライドを読んでいて興味が湧いてきたのでriak_coreを読んでみることにします。

ちなみにlogを見ると、2013年8月1日にこれらの変更が最初に入ったようです。

警告

例によって間違っている可能性は大いにあります。ご指摘をお待ちしております。

tl; dr;

  • Riak 2.0からgossipプロトコルが効率良くなるよ

  • 10台以上から有効化されるよ

    01/23追記: 間違いでした!常にplumtreeが使われていて、gossipを使うのは最初のtreeを作るところだけでした。

用語定義

eager
すぐに送るノード
lazy
あとで送るノードリスト
outstanding
lazyで送ったけどまだackが返ってきていないメッセージ
plumtree
今回導入されたツリー状の経路を通る通信プロトコルのこと。元論文ではEpidemic Broadcast Treesと言ってるし、コード中にどこにもplumtreeと書かれてないけど。(1/14追記: 元論文で普通にplumtreeと書かれていました…あれー)

riak_core_broadcast.erl

eagerでgrepすると、このファイルが引っかかります。クラスタ全体にbroadcastをする機能が実装されている部分です。

この中にinit_peers(Members)という関数があります。この関数でどの通信プロトコルを使うか決定しています。

ノードが1つだけの場合
自分自身だけだから特になし
ノードが2つの場合
お互い送りあうだけ
ノードが2から4の場合
お互いに接続(メッシュで接続)
ノードが5から9の場合
riak_core_gossip.erl で定義されているgossipプロトコルを使うgossipで使っているtree構造を初期ツリーとして使う
ノードが10以上の場合
plumtreeを使う

すなわち、2.0から入ったツリー上の径路を使うには10ノード以上必要ということですね。それ以下であれば当初のツリー構造はgossipで十分ということでしょう。(この10という数字がどこからきているのかは気になるところですが)

なお、init_peers/1はhandle_cast({ring_update, Ring} ...)でも呼ばれており、ringが更新されるたびに通信プロトコルを決め直すことが分かります。

init_peers/1の中でriak_core_util:build_tree/3が呼ばれており、経路を作る実態はこの中のようです。

riak_core_util:build_tree/3

build_tree/3では、渡されたリストからN分木を構築します。Nはメッシュの場合は1、gossipの場合は2, plumtreeの場合はround(math:log(N) +1)となります。

ちなみにinit_parse/1からbuild_tree/3が呼び出されるときの第三引数optionsに[cycles]が入っています。このcycleが入っている場合は、leafから上のノードへのリンクを持つ、つまりdouble-linkedになります。(たぶん…)

Flat = [1,
        11, 12,
        111, 112, 121, 122,
        1111, 1112, 1121, 1122, 1211, 1212, 1221, 1222],

というノードのリストが

CTree = [{1,    [  11,   12]},
         {11,   [ 111,  112]},
         {12,   [ 121,  122]},
         {111,  [1111, 1112]},
         {112,  [1121, 1122]},
         {121,  [1211, 1212]},
         {122,  [1221, 1222]},
         {1111, [   1,   11]},
         {1112, [  12,  111]},
         {1121, [ 112,  121]},
         {1122, [ 122, 1111]},
         {1211, [1112, 1121]},
         {1212, [1122, 1211]},
         {1221, [1212, 1221]},
         {1222, [1222,    1]}],

こうなるわけです。

初期Treeができたので、broadcastに戻ります。

実際にあるノードがbroadcastするときは、broadcast/2を呼びます。

broadcast(Broadcast, Mod) ->
    {MessageId, Payload} = Mod:broadcast_data(Broadcast),
    gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).

Broadcastはデータを、Modはmodule()です。このModはringにいるすべてのノードで実行されます。riak_core_broadcast_handlerで定義されています。

ちょっとこっちを見てみましょう。

riak_core_broadcast_handler.erl

ここには

broadcast_data
message idとpayloadのtupleを返します
merge
messageをローカルで受け取ります。すでに受け取っていた場合はfalseを返します
is_stale
メッセージをすでに受け取っていればtrueを返します
graft
与えられたメッセージidからメッセージを返します。このメッセージはすでに送られている可能性があります。その場合はstaleが返ります
exchange
ローカルと与えられたノードとの間でhandlerを交換するトリガーです。exchangeではローカルとリモートとでそれぞれ自分自身にはないメッセージを交換します。

の5つのcallbackが定義されています。これを頭に入れておくと、このあとよく分かります。

ちなみにexchangeで責任を負うのはすでに受け取っているメッセージに関してだけで、送信されてまだ誰も受け取っていない、途中のメッセージは次のexchangeが責任を持ちます。

再びbroadcast.erl

broadcastでは、まずはこっちが呼ばれます。

handle_cast({broadcast, MessageId, Message, Mod}, State) ->
    State1 = eager_push(MessageId, Message, Mod, State),
    State2 = schedule_lazy_push(MessageId, Mod, State1),
    {noreply, State2};

eager_pushは、eagerリストに対してメッセージを送ります。schedule_lazh_pushはlazyリストに対してあとでメッセージを送ります。

eager_pushはeagerリストを見て、対象のノードにメッセージを送ります。このあたりですね。

%% 一番最初は自分自身から
eager_push(MessageId, Message, Mod, State) ->
    eager_push(MessageId, Message, Mod, 0, node(), node(), State).

%% あとはeagerリストに基づいて送っていく
eager_push(MessageId, Message, Mod, Round, Root, From, State) ->
    Peers = eager_peers(Root, From, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, Peers),
    State.

broadcastを受け取ったら

受け取っていない場合

handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
    Valid = Mod:merge(MessageId, Message),
    State1 = handle_broadcast(Valid, MessageId, Message, Mod, Round, Root, From, State),
    {noreply, State1};

まず、上で説明したmerge/2でローカルに取り込みます。まだ受け取っていない場合、add_eager/3で送信元をeagerリストに追加し、Roundを増やしてさらにeagerに送っていきます。

handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
    State1 = add_eager(From, Root, State),
    State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1),
    schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2).

受け取っていた場合

しかし、すでに受け取っていた場合はlazyリストに入れ、pruneを送り返します。

handle_broadcast(false, _MessageId, _Message, _Mod, _Round, Root, From, State) -> %% stale msg
    State1 = add_lazy(From, Root, State),
    send({prune, Root, node()}, From),
    State1;

pruneを受け取ったらlazyリストに入れます。

handle_cast({prune, Root, From}, State) ->
    State1 = add_lazy(From, Root, State),
    {noreply, State1};

これでスライドの35ページの図になるわけです。

eagerリストとlazyリスト

add_eager/3とadd_lazy/3でeagerリストとlazyリストに追加します。eagerを追加した場合、lazyから抜きます。また、lazyに追加した場合にはeagerから抜きます。

add_eager(From, Root, State) ->
    update_peers(From, Root, fun ordsets:add_element/2, fun ordsets:del_element/2, State).

add_lazy(From, Root, State) ->
    update_peers(From, Root, fun ordsets:del_element/2, fun ordsets:add_element/2, State).

障害時は?

途中経路で障害が起こったときは、lazyリストを使います。

lazyリスト

lazyは障害に備えるために管理しているノードです。schedule_lazy_tick/0 でメッセージが最初に送られてから1000msec後にlazyを処理します。

schedule_lazy_tick() ->
    schedule_tick(lazy_tick, broadcast_lazy_timer, 1000).

lazyはいろいろ経由したあとにsend_lazy/4で i_have メッセージをlazyリストに対して送ります。

send_lazy(MessageId, Mod, Round, Root, Peer) ->
    send({i_have, MessageId, Mod, Round, Root, node()}, Peer).

i_haveメッセージ

i_haveを受け取ったら、すでに受け取っているかどうかをチェックします。

すでに受け取っている(stale)場合は特になにもしません。これにより、通常時はなにも無いわけです。

そうではない場合、つまり見たことがないメッセージをi_haveで受け取った場合、graftメッセージが送信元に送られます。

handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have
    %% TODO: don't graft immediately
    send({graft, MessageId, Mod, Round, Root, node()}, From),
    add_eager(From, Root, State).

graft

graftメッセージを受け取るとgraftを試みます。

handle_cast({graft, MessageId, Mod, Round, Root, From}, State) ->
    Result = Mod:graft(MessageId),
    State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State),
    {noreply, State1};

すでに受け取っていた場合はack_outstandingを返します。

handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
    ack_outstanding(MessageId, Mod, Round, Root, From, State);

ackを受け取ったらoutstandingからメッセージを削除します。

受け取っていないメッセージがgraftで送られてきた場合、なにかがおかしいようです。もう一度最初からメッセージを送ります。

handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
    %% we don't ack outstanding here because the broadcast may fail to be delivered
    %% instead we will allow the i_have to be sent once more and let the subsequent
    %% ignore serve as the ack.
    State1 = add_eager(From, Root, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, From),
    State1;

outstandingがあふれる?

outstandingには送られてないメッセージが入っているので、そのうちあふれてしまうのでは、と思うのですが、

ring_updateが来た時にneighbors_down/2が呼ばれ、この中でeagerとlazyから落ちたノードをはずし、outstandingも消します。だからあふれる心配は無いわけですね。

障害から復帰

障害が起きていたノードが復帰した場合、ring_updateが走り、最初の init_peers/1 からやり直されます。

まとめ

Riak 2.0で入るツリー状の経路を通ってブロードキャストするプロトコルについてriak_coreを読んでみました。

例えば1000台とかになるとgossipじゃなくてplumtreeの方がいいんでしょうね。(元論文には障害率とか詳しく載ってます)

しかしerlangは読みやすいなぁ。