Read Riak 2.0 Plumtree

Shinohara in the last year of Riak Advent calendar, Riak 2.0 : クラスタ全体のデータ共有を効率 has written an article called.

According to this, in addition to the Riak 2.0 gossip protocol Plumtreeという論文 tree-like, which is implemented on the basis of the A protocol that communicates through the path of.

Has been introduced スライド to have read because the interest has been infested read riak_core I will look at it.

By the way, looking at log, it seems that these changes first entered on August 1, 2013.

Warning

There is much possibility of being wrong by example. We are waiting for your comments.

Tl; dr; --------------

  • From Riak 2.0 the gossip protocol will be more efficient

  • 10台以上から有効化されるよ

    01/23 postscript : It was a mistake! Plumtree is always used, and using gossip was the only way to make the first tree.

Term Definition

Eager
Node to send immediately
Lazy
Sending node list later
Outstanding
A message sent by lazy but ack has not returned yet
Plumtree
It is a communication protocol that passes through a tree-like route introduced this time. In the original paper I say Epidemic Broadcast Trees and I have not written plumtree everywhere in the code though. (1/14 postscript : had been normally written plumtree in the original paper ... array)

Riak_core_broadcast.erl

If you grep it with eager, this file will get caught. It is the part where the function to broadcast on the whole cluster is implemented.

In this init_peers(Members) There is a function called. This function decides which communication protocol to use.

When there is only one node
Since it is only myself, there is nothing special
When there are two nodes
Just send each other
When the node is 2 to 4
Connect with each other (connect with mesh)
When the node is 5 to 9
riak_core_gossip.erl で定義されているgossipプロトコルを使う use the tree structure that is used in gossip as the initial tree
When the node is 10 or more
Use plumtree

In other words, it is necessary to use more than 10 nodes to use the path on the tree from 2.0. If it is less then gossip will be enough for the original tree structure. (I am worried about where the number 10 is coming from)

It should be noted, init_peers / 1 is handle_cast({ring_update, Ring} ...) ...) * even has been called, you can see that re-determine the communication protocol each time a ring is updated.

init_peers / 1 of the in riak_core_util:build_tree/3 has been called, realities that make the route is like in this.

Riak_core_util: build_tree / 3

Build_tree / 3 builds an N-ary tree from the passed in list. N is 1 for mesh, 2 for gossip, round (math: log (N) + 1) for plumtree.

By the way, the third argument options when calling build_tree / 3 from init_parse / 1 contains [cycles]. If this cycle is included, it will have a link from leaf to the upper node, ie double - linked. (maybe…)

Flat = [1,
        11, 12,
        111, 112, 121, 122,
        1111, 1112, 1121, 1122, 1211, 1212, 1221, 1222],

The list of nodes

CTree = [{1,    [  11,   12]},
         {11,   [ 111,  112]},
         {12,   [ 121,  122]},
         {111,  [1111, 1112]},
         {112,  [1121, 1122]},
         {121,  [1211, 1212]},
         {122,  [1221, 1222]},
         {1111, [   1,   11]},
         {1112, [  12,  111]},
         {1121, [ 112,  121]},
         {1122, [ 122, 1111]},
         {1211, [1112, 1121]},
         {1212, [1122, 1211]},
         {1221, [1212, 1221]},
         {1222, [1222,    1]}],

That's it.

I got an initial Tree, so I will return to broadcast.

When a node actually broadcasts, it calls broadcast / 2.

broadcast(Broadcast, Mod) ->
    {MessageId, Payload} = Mod:broadcast_data(Broadcast),
    gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).

Broadcast is data, Mod is module (). This Mod will be executed on all nodes in the ring. Defined in riak_core_broadcast_handler.

Let's see a little bit here.

Riak_core_broadcast_handler.erl

here

Broadcast_data
Returns tuple of message id and payload
Merge
Receive message locally. It returns false if it has already been received
Is_stale
Returns true if a message has already been received
Graft
It returns a message from the given message id. This message may already have been sent. In that case stale will be returned
Exchange
Trigger to exchange handlers between local and given node. Exchange exchanges messages not on its own with local and remote respectively.

There are five callbacks defined. If you keep this in mind, you will understand well after this.

By the way, it is only with respect to messages already received by exchange, and the next exchange is responsible for the message on the way, which has been sent and has not been received by anyone yet.

Again broadcast.erl

In broadcast, this is called first.

handle_cast({broadcast, MessageId, Message, Mod}, State) ->
    State1 = eager_push(MessageId, Message, Mod, State),
    State2 = schedule_lazy_push(MessageId, Mod, State1),
    {noreply, State2};

Eager_push sends a message to the eager list. Schedule_lazh_push sends a message to the lazy list at a later time.

Eager_push looks at the eager list and sends messages to the target node. This is around here.

%% 一番最初は自分自身から
eager_push(MessageId, Message, Mod, State) ->
    eager_push(MessageId, Message, Mod, 0, node(), node(), State).

%% あとはeagerリストに基づいて送っていく
eager_push(MessageId, Message, Mod, Round, Root, From, State) ->
    Peers = eager_peers(Root, From, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, Peers),
    State.

If you receive broadcast

If you have not received it

handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
    Valid = Mod:merge(MessageId, Message),
    State1 = handle_broadcast(Valid, MessageId, Message, Mod, Round, Root, From, State),
    {noreply, State1};

First of all, take in locally with merge / 2 described above. If you have not received it yet, add the source to the eager list with add_eager / 3, increase the Round and send it to eager.

handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
    State1 = add_eager(From, Root, State),
    State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1),
    schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2).

If you have received

However, if you have already received it, put it in the lazy list and send prune back.

handle_broadcast(false, _MessageId, _Message, _Mod, _Round, Root, From, State) -> %% stale msg
    State1 = add_lazy(From, Root, State),
    send({prune, Root, node()}, From),
    State1;

When you receive prune, put it in the lazy list.

handle_cast({prune, Root, From}, State) ->
    State1 = add_lazy(From, Root, State),
    {noreply, State1};

This is the illustration on page 35 of the slide.

Eager list and lazy list

Add it to the eager list and lazy list with add_eager / 3 and add_lazy / 3. If you add eager, remove from lazy. Also, if added to lazy, remove from eager.

add_eager(From, Root, State) ->
    update_peers(From, Root, fun ordsets:add_element/2, fun ordsets:del_element/2, State).

add_lazy(From, Root, State) ->
    update_peers(From, Root, fun ordsets:del_element/2, fun ordsets:add_element/2, State).

In case of trouble?

If a failure occurs on the way, use the lazy list.

Lazy list

Lazy is a node managed to prepare for failures. Process lazy 1000 ms after schedule_lazy_tick / 0 sends the message first.

schedule_lazy_tick() ->
    schedule_tick(lazy_tick, broadcast_lazy_timer, 1000).

Lazy sends various i_have messages to the lazy list with send_lazy / 4 after passing through various things.

send_lazy(MessageId, Mod, Round, Root, Peer) ->
    send({i_have, MessageId, Mod, Round, Root, node()}, Peer).

I_have message

When i_have is received, it checks whether it has already received it.

I will not do anything particularly if I have already received it (stale). As a result, there is nothing in normal time.

Otherwise, if you receive a message that you have not seen in i_have, a graft message is sent to the sender.

handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have
    %% TODO: don't graft immediately
    send({graft, MessageId, Mod, Round, Root, node()}, From),
    add_eager(From, Root, State).

Graft

When graft message is received, graft is tried.

handle_cast({graft, MessageId, Mod, Round, Root, From}, State) ->
    Result = Mod:graft(MessageId),
    State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State),
    {noreply, State1};

If it has already been received it will return ack_outstanding.

handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
    ack_outstanding(MessageId, Mod, Round, Root, From, State);

When you receive ack, delete the message from outstanding.

If an unreceived message is sent in graft, something is wrong. I will send a message from the beginning again.

handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
    %% we don't ack outstanding here because the broadcast may fail to be delivered
    %% instead we will allow the i_have to be sent once more and let the subsequent
    %% ignore serve as the ack.
    State1 = add_eager(From, Root, State),
    send({broadcast, MessageId, Message, Mod, Round, Root, node()}, From),
    State1;

Is outstanding overflowing?

Since outstanding contains messages that have not been sent, I think that it will overflow in time,

When ring_update comes, neighbors_down / 2 is called, remove the nodes that fell from eager and lazy in this, and delete outstanding. So there is no worry overflowing.

Return from failure

If the failed node returns, ring_update will run and it will start over from the first init_peers / 1.

Summary

I read riak_core about the protocol that broadcasts through a tree- like route entering at Riak 2.0.

For example, if you become 1000 or something, it is better not to use gossip but plumtree. (In the original dissertation, the disability ratio and so on are listed in detail)

But erlang is easy to read.