What I have studied about riak's handoff

For source code reading, we decided to examine riak's handoff in detail.

Or I think that it is probably useless for others as it is a summary for me in order to grasp the flow of processing.

About handoff

When ring_update is called and the ring is updated, node rearrangement will occur, so the range of the ring handled by each vnode also changes. Along with that, it is necessary to send the actual data that each vnode has to the new responsible vnode. This is called handoff.

Process flow

  1. riak_core_vnode_manager:schedule_management_timer/0

    A timer is set with send_after, and handle_info is called for every management_tick.

    Code-Block .. :: erlang

    Schedule_management_timer () ->
    ManagementTick = app_helper: get_env (riak_core,

    Vnode_management_timer, 10000),

    Erlang: send_after (ManagementTick,? MODULE, management_tick).

  2. riak_core_vnode_manager:handle_info(management_tick, State0)

    It checks whether ring has changed in this function called at regular intervals.

    Code-Block .. :: erlang

    RingID = riak_core_ring_manager: get_ring_id (), {ok, Ring, CHBin} = riak_core_ring_manager: get_raw_ring_chashbin (), State = maybe_ring_changed (RingID, Ring, CHBin, State 0)

    I call ring_update in maybe_ring_changed. After that, this function is done by checking the ring by calling check_repairs / 1 etc.

  3. riak_core_vnode_manager:maybe_ring_changed/4

    It checks whether RingID is last_ring_id, and determines whether the ring has changed. If it is not, call ring_changed / 3.

    Code-Block .. :: erlang

    Case RingID of
    LastID ->

    Maybe_ensure_vnodes_started (Ring), State;

    _ ->

    Ensure_vnodes_started (Ring), State 2 = ring_changed (Ring, CHBin, State), State 2 # state {last_ring_id = RingID}

    TODO : find out why it can be determined whether or not the ring has changed in this.

    (03/03 postscript) : A RingID, a unique ID in the entire Ring ではなく of vnode that each node have in their own id. It is incremented if there is a change. Therefore, if RingID and LastID are different, it means that a change has occurred.

  4. riak_core_vnode_manager:ring_changed/3

    Code-Block .. :: erlang

    State 2 = update_forwarding (AllVNodes, Mods, Ring, State),

    %% Update handoff state State 3 = update_handoff (AllVNodes, Ring, CHBin, State 2),

    %% Trigger ownership transfers. Transfers = riak_core_ring: pending_changes (Ring), trigger_ownership_handoff (Transfers, Mods, Ring, State 3),

    It updates the state of forward and the state of handoff and calls trigger_ownership_handoff / 4. The state here is whether forward or handoff should be done.

  5. riak_core_vnode_manager:trigger_ownership_handoff/4

    Code-Block .. :: erlang

    Throttle = limit_ownership_handoff (Transfers, IsResizing), Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,

    Mod <- Mods, S =: = awaiting, Node =: = node (), not lists: member (Mod, CMods)],

    [Maybe_trigger_handoff (Mod, Idx, State) || {Mod, Idx} <- Awaiting],

  6. riak_core_vnode_manager:maybe_trigger_handoff/3

    Riak_core_vnode: Call trigger_handoff / 3.

    Code-Block .. :: erlang

    Case dict: find ({Mod, Idx}, HO) of
    {Ok, '$ resize'} ->

    {Ok, Ring} = riak_core_ring_manager: get_my_ring (), case riak_core_ring: awaiting_resize_transfer (Ring, {Idx, node ()}, Mod) of

    Undefined -> ok; {TargetIdx, TargetNode} ->

    Riak_core_vnode: trigger_handoff (Pid, TargetIdx, TargetNode)

    End;

    {Ok, '$ delete'} ->

    Riak_core_vnode: trigger_delete (Pid);

    {Ok, TargetNode} ->

    Riak_core_vnode: trigger_handoff (Pid, TargetNode), ok;

  7. trigger_handoff/3

    Call trigger_handoff event.

    Code-Block .. :: erlang

    Trigger_handoff (VNode, TargetIdx, TargetNode) ->

    Gen_fsm: send_all_state_event (VNode, {trigger_handoff, TargetIdx, TargetNode}).

  8. riak_core_vnode:active({trigger_handoff,

    Call maybe_handoff / 3.

    Code-Block .. :: erlang

    Active ({trigger_handoff, TargetIdx, TargetNode}, State) ->

    Maybe_handoff (TargetIdx, TargetNode, State);

  9. riak_core_vnode:maybe_handoff/3

    Define HOType by judging whether it is resize_transfer or hinted_handoff from Resizing and Primary. Call start_handoff / 4 with its HOType as an argument.

  10. riak_core_vnode:start_handoff/4

    Call start_outbound / 4

  11. riak_core_vnode:start_outbound/4

    Riak_core_handoff_manager: call add_outbound / 7.

    Code-Block .. :: erlang

    Case riak_core_handoff_manager: add_outbound (HOType, Mod, Idx, TargetIdx, TargetNode, self (), Opts) of

    {Ok, Pid} ->
    State # state {handoff_pid = Pid,

    Handoff_type = HOType, handoff_target = {TargetIdx, TargetNode}};

  12. riak_core_handoff_manager:add_outbound/7

    Call send_handoff / 8.

    Code-Block .. :: erlang

    Case send_handoff (Type, {Mod, SrcIdx, TargetIdx}, Node, Pid, HS, Opts) of

    {Ok, Handoff = # handoff_status {transport_pid = Sender}} ->

    HS2 = HS ++ [Handoff], {reply, {ok, Sender}, State # state {handoffs = HS 2}};

  13. riak_core_handoff_manager:send_handoff/8

    Riak_core_handoff_manager: Also called from xfer / 3 and handle_cast {send_handoff.

    Inside of this, I check if other handoff is running. (Should Handoff = per)

    Then, if you know that you can actually run handoff, build a monitor and launch the sender process. Specifically, we call riak_core_handoff_sender_sup: start_sender. At this time, filter etc. are changed according to HandoffType (HOType).

    Code-Block .. :: erlang

    Case HOType of

    Case HOType of

    HOAcc 0 = undefined, HONotSentFun = undefined;

    HONotSentFun = undefined;

    Resize_transfer ->

    {Ok, Ring} = riak_core_ring_manager: get_my_ring (), HOFilter = resize_transfer_filter (Ring, Mod, Src, Target), HOAcc 0 = ordsets: new (), HONotSentFun = resize_transfer_notsent_fun (Ring, Mod, Src);

    _ ->

    HOFilter = none, HOAcc 0 = undefined, HONotSentFun = undefined

    End, HOOpts = [{filter, HOFilter},

    {Notsent_acc 0, HOAcc 0}, {notsent_fun, HONotSentFun} | BaseOpts],

    {Ok, Pid} = riak_core_handoff_sender_sup: start_sender (HOType,

    M o d , N o d e , V n o d e , H O O p t s ) ,

    PidM = monitor (process, Pid),

  14. riak_core_handoff_sender_sup:start_sender

    Call start_fold / 5 with start_link.

  15. riak_core_hanoff_sender:start_fold/5

    First we will check last that you can handoff on maybe_call_handoff_started / 3.

    Next, I actually connect with the handoff destination.

    Code-Block .. :: erlang

    True ->

    {Ok, Skt} = gen_tcp: connect (TNHandoffIP, Port, SockOpts, 15000), {Skt, gen_tcp}

    End,

    I will send you a message.

    Code-Block .. :: erlang

    ModBin = atom_to_binary (Module, utf 8), Msg = <<? PT_MSG_OLDSYNC: 8, ModBin / binary >>, ok = TcpMod: send (Socket, Msg),

    Receive the message sync from transfer destination. Receiving this means that the transfer destination has not refused.

    Code-Block .. :: erlang

    Case TcpMod: recv (Socket, 0, RecvTimeout) of

    {Error, closed} -> exit ({shutdown, max_concurrency} -> exit ({shutdown, timeout} )

    End,

    Check that the receiver can receive batching messages.

    Code-Block .. :: erlang

    RemoteSupportsBatching = remote_supports_batching (TargetNode),

    Send the Partition ID,

    Code-Block .. :: erlang

    M = <<? PT_MSG_INIT: 8, TargetPartition: 160 / integer >>, ok = TcpMod: send (Socket, M),

    Create fold_req. I pass the function visit_item / 3 here, but in this function called visit_item / 3, I actually send the data.

    Code-Block .. :: erlang

    Req = riak_core_util: make_fold_req (

    Fun visit_item / 3, # ho_ acc {ack = 0,

    Call sync_command. Comments are also worrisome here ...

    Code-Block .. :: erlang

    %% IFF the vnode is using an async worker to perform the fold %% then sync_command will return error on vnode crash, %% otherwise it will wait forever but vnode crash will be %% caught by handoff manager. I know, this is confusing , A%% new handoff system will be written soon enough.

    AccRecord 0 = riak_core_vnode_master: sync_command ({SrcPartition, SrcNode},

    Req, VMaster, infinity),

    Finally, if it is still in the buffer, call send_objects / 2.

    Code-Block .. :: erlang

    Send Any Straggler Pasentopasento Entries Remaining In The Buffer : AccRecord = Send_objects (AccRecord0 # Ho_acc.Item_queue, AccRecord0),

    If there is no error, send a message saying sync and confirm that it returns. When I came back, the transfer was successful.

    Code-Block .. :: erlang

    Lager: debug ("~ p ~ p Sending final sync",

    [SrcPartition, Module]),

    Ok = TcpMod: send (Socket, <<? PT_MSG_SYNC: 8 >>),

    Case TcpMod: recv (Socket, 0, RecvTimeout) of
    {Ok, [? PT_MSG_SYNC | << "sync" >>]} ->
    Lager: debug ("~ p ~ p Final sync received",

    [SrcPartition, Module]);

    {Error, timeout} -> exit ({shutdown, timeout})

    End,

    Finally, resize_transfer_complete or handoff_complete events are activated depending on the type and it is over.

    Code-Block .. :: erlang

    Case Type of repair -> ok; resize_transfer -> gen_fsm: send_event (ParentPid, {resize_transfer_complete,

    N o t S e n t A c c } ) ;

    _ -> gen_fsm: send_event (ParentPid, handoff_complete)

    End;

  16. riak_core_handoff_manager:visit_item/3

    Visit_item has two things to send with sync and one to send with async.

    In the case of sync, I send it one by one like this and recv it.

    Code-Block .. :: erlang

    Case TcpMod: send (Sock, M) of
    Ok ->
    Case TcpMod: recv (Sock, 0, RecvTimeout) of
    {Ok, [? PT_MSG_OLDSYNC | << "sync" >>]} ->

    Acc2 = Acc # ho_acc {ack = 0, error = ok, stats = Stats 3}, visit_item (K, V, Acc 2);

    Otherwise, it passes through filter, determines whether to send in batch, and sends it in fact.

    Code-Block .. :: erlang

    Case Module: encode_handoff_item (K, V) of

    Acc;

    Acc;

    Case ItemQueueByteSize2 = <HandoffBatchThreshold of

    Case ItemQueueByteSize2 = <HandoffBatchThreshold of

    True -> Acc 2 # ho_acc {item_queue = ItemQueue 2}; false -> send_objects (ItemQueue 2, Acc 2)

    End;

    In the case of UseBatching, it checks whether it is within HandoffBatchTrheshold, if it is within, it gathers up, if it is exceeded, it sends it using send_objects / 2.

  17. riak_core_handoff_manager:send_objects/2

    It is a function that actually sends data. It is like this.

    Code-Block .. :: erlang

    M = <<? PT_MSG_BATCH: 8, ObjectList / binary >>,

    NumBytes = byte_size (M),

    Stats 2 = incr_bytes (incr_objs (Stats, NObjects), NumBytes), Stats 3 = maybe_send_status ({Module, SrcPartition, TargetPartition}, Stats 2)

    Case TcpMod: send (Sock, M) of

When you receive

  1. riak_core_handoff_listener:new_connection/2

    gen_nb_server have to behavior the riak_core_handoff_listener is,

    When accepted, it calls riak_core_handoff_manager: add_inbound / 1.

    Code-Block .. :: erlang

    New_connection (Socket, State = # state {ssl_opts = SslOpts}) ->
    Case riak_core_handoff_manager: add_inbound (SslOpts) of
    {Ok, Pid} ->

    Gen_tcp: controlling_process (Socket, Pid), ok = riak_core_handoff_receiver: set_socket (Pid, Socket), {ok, State};

    {Error, _Reason} ->

    Riak_core_stat: update (rejected_handoffs), gen_tcp: close (Socket), {ok, State}

  2. riak_core_handoff_manager:add_inbound/1

    Call receive_handoff / 1.

    Code-Block .. :: erlang

    {Reply, Error, State}

    End;

  3. riak_core_handoff_manager:receive_handoff/1

    Call handoff_concurrency_limit_reached / 0 to check the parallel number, then call riak_core_handoff_receiver_sup: start_receiver to invoke receiver process.

    Code-Block .. :: erlang

    {Error, max_concurrency};

    False ->

    {Ok, Pid} = riak_core_handoff_receiver_sup: start_receiver (SSLOpts), PidM = monitor (process, Pid),

    %% successfully started up a new receiver {ok, # handoff_status {transport_pid = Pid,

  4. riak_core_handoff_receiver at do the processing according to the msg.

    INIT

    Code-Block .. :: erlang

    Process_message (? PT_MSG_INIT, MsgData, State = # state {vnode_mod = VNodeMod,

    Peer = Peer}) ->

    Partition: 160 / integer >> = MsgData, lager: info ("Receiving handoff data for partition ~ p: ~ p from ~ p", [VNodeMod, Partition, Peer]), {ok, VNode} = riak_core_vnode_master: get_vnode_pid (Partition, VNodeMod), Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}},

    {Vnode_pid, VNode}],

    Riak_core_handoff_manager: set_recv_data (self (), Data), State # state {partition = Partition, vnode = VNode};

    SYNC

    Code-Block .. :: erlang

    Process_message (? PT_MSG_SYNC, _MsgData, State = # state {sock = Socket,

    Tcp_mod = TcpMod}) ->

    TcpMod: send (Socket, <<? PT_MSG_SYNC: 8, "sync" >>), State;

    BATCH. In fact it is treated as a collection of PT_MSG_OBJ.

    Code-Block .. :: erlang

    Process_message (? PT_MSG_BATCH, MsgData, State) ->

    Lists: foldl (fun (Obj, StateAcc) -> process_message (? PT_MSG_OBJ, Obj, StateAcc) end,

    State, binary_to_term (MsgData));

    OBJ. The data received here is sent to gen_fsm 's event as Obj. ...

    Code-Block .. :: erlang

    Case gen_fsm: sync_send_all_state_event (VNode, Msg, 60000) of

    Ok ->

    State # state {count = Count + 1};

    E = {error, _} ->

    Exit (E)

    End;

Schematic flow

TBD

Riak_core_vnode_proxy

Tip

3/3 追記

Vnode_proxy is a place where all requests to vnode pass. It seems that this proxy will respond, for example, if the vnode can not notify you. (However, the actual code is not pursued)

In riak_core_ring_handler.erl, maybe_start_vnode_proxies (Ring) is called.

handle_event({ring_update, Ring}, State) ->
    maybe_start_vnode_proxies(Ring),
    maybe_stop_vnode_proxies(Ring),
    {ok, State}.

In maybe_start_vnode_proxies, if the destination ring is larger than the current one, we are building a proxy. However, I also stop right after that, I do not understand a bit better.

Riak_core_vnode_worker

Tip

3/3 追記

It is said that vnode_worker is one that enables asynchronous request by activating this worker when vnode performs time-consuming operation and letting worker do it. (However, the actual code is not pursued)