What I have studied about riak's handoff
For source code reading, we decided to examine riak's handoff in detail.
Or I think that it is probably useless for others as it is a summary for me in order to grasp the flow of processing.
About handoff
When ring_update is called and the ring is updated, node rearrangement will occur, so the range of the ring handled by each vnode also changes. Along with that, it is necessary to send the actual data that each vnode has to the new responsible vnode. This is called handoff.
Process flow
riak_core_vnode_manager:schedule_management_timer/0
A timer is set with send_after, and handle_info is called for every management_tick.
Code-Block .. :: erlang
- Schedule_management_timer () ->
- ManagementTick = app_helper: get_env (riak_core,
Vnode_management_timer, 10000),
Erlang: send_after (ManagementTick,? MODULE, management_tick).
riak_core_vnode_manager:handle_info(management_tick, State0)
It checks whether ring has changed in this function called at regular intervals.
Code-Block .. :: erlang
RingID = riak_core_ring_manager: get_ring_id (), {ok, Ring, CHBin} = riak_core_ring_manager: get_raw_ring_chashbin (), State = maybe_ring_changed (RingID, Ring, CHBin, State 0)
I call ring_update in maybe_ring_changed. After that, this function is done by checking the ring by calling check_repairs / 1 etc.
riak_core_vnode_manager:maybe_ring_changed/4
It checks whether RingID is last_ring_id, and determines whether the ring has changed. If it is not, call ring_changed / 3.
Code-Block .. :: erlang
- Case RingID of
- LastID ->
Maybe_ensure_vnodes_started (Ring), State;
- _ ->
Ensure_vnodes_started (Ring), State 2 = ring_changed (Ring, CHBin, State), State 2 # state {last_ring_id = RingID}
TODO : find out why it can be determined whether or not the ring has changed in this.
(03/03 postscript) : A RingID, a unique ID in the entire Ring ではなく of vnode that each node have in their own id. It is incremented if there is a change. Therefore, if RingID and LastID are different, it means that a change has occurred.
riak_core_vnode_manager:ring_changed/3
Code-Block .. :: erlang
State 2 = update_forwarding (AllVNodes, Mods, Ring, State),
%% Update handoff state State 3 = update_handoff (AllVNodes, Ring, CHBin, State 2),
%% Trigger ownership transfers. Transfers = riak_core_ring: pending_changes (Ring), trigger_ownership_handoff (Transfers, Mods, Ring, State 3),
It updates the state of forward and the state of handoff and calls trigger_ownership_handoff / 4. The state here is whether forward or handoff should be done.
riak_core_vnode_manager:trigger_ownership_handoff/4
Code-Block .. :: erlang
Throttle = limit_ownership_handoff (Transfers, IsResizing), Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Mod <- Mods, S =: = awaiting, Node =: = node (), not lists: member (Mod, CMods)],
[Maybe_trigger_handoff (Mod, Idx, State) || {Mod, Idx} <- Awaiting],
riak_core_vnode_manager:maybe_trigger_handoff/3
Riak_core_vnode: Call trigger_handoff / 3.
Code-Block .. :: erlang
- Case dict: find ({Mod, Idx}, HO) of
- {Ok, '$ resize'} ->
{Ok, Ring} = riak_core_ring_manager: get_my_ring (), case riak_core_ring: awaiting_resize_transfer (Ring, {Idx, node ()}, Mod) of
- Undefined -> ok; {TargetIdx, TargetNode} ->
Riak_core_vnode: trigger_handoff (Pid, TargetIdx, TargetNode)
End;
- {Ok, '$ delete'} ->
Riak_core_vnode: trigger_delete (Pid);
- {Ok, TargetNode} ->
Riak_core_vnode: trigger_handoff (Pid, TargetNode), ok;
trigger_handoff/3
Call trigger_handoff event.
Code-Block .. :: erlang
- Trigger_handoff (VNode, TargetIdx, TargetNode) ->
Gen_fsm: send_all_state_event (VNode, {trigger_handoff, TargetIdx, TargetNode}).
riak_core_vnode:active({trigger_handoff,
Call maybe_handoff / 3.
Code-Block .. :: erlang
- Active ({trigger_handoff, TargetIdx, TargetNode}, State) ->
Maybe_handoff (TargetIdx, TargetNode, State);
riak_core_vnode:maybe_handoff/3
Define HOType by judging whether it is resize_transfer or hinted_handoff from Resizing and Primary. Call start_handoff / 4 with its HOType as an argument.
riak_core_vnode:start_handoff/4
Call start_outbound / 4
riak_core_vnode:start_outbound/4
Riak_core_handoff_manager: call add_outbound / 7.
Code-Block .. :: erlang
Case riak_core_handoff_manager: add_outbound (HOType, Mod, Idx, TargetIdx, TargetNode, self (), Opts) of
- {Ok, Pid} ->
- State # state {handoff_pid = Pid,
Handoff_type = HOType, handoff_target = {TargetIdx, TargetNode}};
riak_core_handoff_manager:add_outbound/7
Call send_handoff / 8.
Code-Block .. :: erlang
Case send_handoff (Type, {Mod, SrcIdx, TargetIdx}, Node, Pid, HS, Opts) of
- {Ok, Handoff = # handoff_status {transport_pid = Sender}} ->
HS2 = HS ++ [Handoff], {reply, {ok, Sender}, State # state {handoffs = HS 2}};
riak_core_handoff_manager:send_handoff/8
Riak_core_handoff_manager: Also called from xfer / 3 and handle_cast {send_handoff.
Inside of this, I check if other handoff is running. (Should Handoff = per)
Then, if you know that you can actually run handoff, build a monitor and launch the sender process. Specifically, we call riak_core_handoff_sender_sup: start_sender. At this time, filter etc. are changed according to HandoffType (HOType).
Code-Block .. :: erlang
Case HOType of
Case HOType of
HOAcc 0 = undefined, HONotSentFun = undefined;
HONotSentFun = undefined;
- Resize_transfer ->
{Ok, Ring} = riak_core_ring_manager: get_my_ring (), HOFilter = resize_transfer_filter (Ring, Mod, Src, Target), HOAcc 0 = ordsets: new (), HONotSentFun = resize_transfer_notsent_fun (Ring, Mod, Src);
- _ ->
HOFilter = none, HOAcc 0 = undefined, HONotSentFun = undefined
- End, HOOpts = [{filter, HOFilter},
{Notsent_acc 0, HOAcc 0}, {notsent_fun, HONotSentFun} | BaseOpts],
{Ok, Pid} = riak_core_handoff_sender_sup: start_sender (HOType,
M o d , N o d e , V n o d e , H O O p t s ) ,
PidM = monitor (process, Pid),
riak_core_handoff_sender_sup:start_sender
Call start_fold / 5 with start_link.
riak_core_hanoff_sender:start_fold/5
First we will check last that you can handoff on maybe_call_handoff_started / 3.
Next, I actually connect with the handoff destination.
Code-Block .. :: erlang
- True ->
{Ok, Skt} = gen_tcp: connect (TNHandoffIP, Port, SockOpts, 15000), {Skt, gen_tcp}
End,
I will send you a message.
Code-Block .. :: erlang
ModBin = atom_to_binary (Module, utf 8), Msg = <<? PT_MSG_OLDSYNC: 8, ModBin / binary >>, ok = TcpMod: send (Socket, Msg),
Receive the message sync from transfer destination. Receiving this means that the transfer destination has not refused.
Code-Block .. :: erlang
- Case TcpMod: recv (Socket, 0, RecvTimeout) of
{Error, closed} -> exit ({shutdown, max_concurrency} -> exit ({shutdown, timeout} )
End,
Check that the receiver can receive batching messages.
Code-Block .. :: erlang
RemoteSupportsBatching = remote_supports_batching (TargetNode),
Send the Partition ID,
Code-Block .. :: erlang
M = <<? PT_MSG_INIT: 8, TargetPartition: 160 / integer >>, ok = TcpMod: send (Socket, M),
Create fold_req. I pass the function visit_item / 3 here, but in this function called visit_item / 3, I actually send the data.
Code-Block .. :: erlang
- Req = riak_core_util: make_fold_req (
Fun visit_item / 3, # ho_ acc {ack = 0,
Call sync_command. Comments are also worrisome here ...
Code-Block .. :: erlang
%% IFF the vnode is using an async worker to perform the fold %% then sync_command will return error on vnode crash, %% otherwise it will wait forever but vnode crash will be %% caught by handoff manager. I know, this is confusing , A%% new handoff system will be written soon enough.
AccRecord 0 = riak_core_vnode_master: sync_command ({SrcPartition, SrcNode},
Req, VMaster, infinity),
Finally, if it is still in the buffer, call send_objects / 2.
Code-Block .. :: erlang
Send Any Straggler Pasentopasento Entries Remaining In The Buffer : AccRecord = Send_objects (AccRecord0 # Ho_acc.Item_queue, AccRecord0),
If there is no error, send a message saying sync and confirm that it returns. When I came back, the transfer was successful.
Code-Block .. :: erlang
- Lager: debug ("~ p ~ p Sending final sync",
[SrcPartition, Module]),
Ok = TcpMod: send (Socket, <<? PT_MSG_SYNC: 8 >>),
- Case TcpMod: recv (Socket, 0, RecvTimeout) of
- {Ok, [? PT_MSG_SYNC | << "sync" >>]} ->
- Lager: debug ("~ p ~ p Final sync received",
[SrcPartition, Module]);
{Error, timeout} -> exit ({shutdown, timeout})
End,
Finally, resize_transfer_complete or handoff_complete events are activated depending on the type and it is over.
Code-Block .. :: erlang
Case Type of repair -> ok; resize_transfer -> gen_fsm: send_event (ParentPid, {resize_transfer_complete,
N o t S e n t A c c } ) ;
_ -> gen_fsm: send_event (ParentPid, handoff_complete)
End;
riak_core_handoff_manager:visit_item/3
Visit_item has two things to send with sync and one to send with async.
In the case of sync, I send it one by one like this and recv it.
Code-Block .. :: erlang
- Case TcpMod: send (Sock, M) of
- Ok ->
- Case TcpMod: recv (Sock, 0, RecvTimeout) of
- {Ok, [? PT_MSG_OLDSYNC | << "sync" >>]} ->
Acc2 = Acc # ho_acc {ack = 0, error = ok, stats = Stats 3}, visit_item (K, V, Acc 2);
Otherwise, it passes through filter, determines whether to send in batch, and sends it in fact.
Code-Block .. :: erlang
Case Module: encode_handoff_item (K, V) of
Acc;
Acc;
Case ItemQueueByteSize2 = <HandoffBatchThreshold of
Case ItemQueueByteSize2 = <HandoffBatchThreshold of
True -> Acc 2 # ho_acc {item_queue = ItemQueue 2}; false -> send_objects (ItemQueue 2, Acc 2)
End;
In the case of UseBatching, it checks whether it is within HandoffBatchTrheshold, if it is within, it gathers up, if it is exceeded, it sends it using send_objects / 2.
riak_core_handoff_manager:send_objects/2
It is a function that actually sends data. It is like this.
Code-Block .. :: erlang
M = <<? PT_MSG_BATCH: 8, ObjectList / binary >>,
NumBytes = byte_size (M),
Stats 2 = incr_bytes (incr_objs (Stats, NObjects), NumBytes), Stats 3 = maybe_send_status ({Module, SrcPartition, TargetPartition}, Stats 2)
Case TcpMod: send (Sock, M) of
When you receive
riak_core_handoff_listener:new_connection/2
gen_nb_server have to behavior the riak_core_handoff_listener is,
When accepted, it calls riak_core_handoff_manager: add_inbound / 1.
Code-Block .. :: erlang
- New_connection (Socket, State = # state {ssl_opts = SslOpts}) ->
- Case riak_core_handoff_manager: add_inbound (SslOpts) of
- {Ok, Pid} ->
Gen_tcp: controlling_process (Socket, Pid), ok = riak_core_handoff_receiver: set_socket (Pid, Socket), {ok, State};
- {Error, _Reason} ->
Riak_core_stat: update (rejected_handoffs), gen_tcp: close (Socket), {ok, State}
riak_core_handoff_manager:add_inbound/1
Call receive_handoff / 1.
Code-Block .. :: erlang
{Reply, Error, State}
End;
riak_core_handoff_manager:receive_handoff/1
Call handoff_concurrency_limit_reached / 0 to check the parallel number, then call riak_core_handoff_receiver_sup: start_receiver to invoke receiver process.
Code-Block .. :: erlang
{Error, max_concurrency};
- False ->
{Ok, Pid} = riak_core_handoff_receiver_sup: start_receiver (SSLOpts), PidM = monitor (process, Pid),
%% successfully started up a new receiver {ok, # handoff_status {transport_pid = Pid,
riak_core_handoff_receiver at do the processing according to the msg.
INIT
Code-Block .. :: erlang
Process_message (? PT_MSG_INIT, MsgData, State = # state {vnode_mod = VNodeMod,
Peer = Peer}) ->
Partition: 160 / integer >> = MsgData, lager: info ("Receiving handoff data for partition ~ p: ~ p from ~ p", [VNodeMod, Partition, Peer]), {ok, VNode} = riak_core_vnode_master: get_vnode_pid (Partition, VNodeMod), Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}},
{Vnode_pid, VNode}],
Riak_core_handoff_manager: set_recv_data (self (), Data), State # state {partition = Partition, vnode = VNode};
SYNC
Code-Block .. :: erlang
Process_message (? PT_MSG_SYNC, _MsgData, State = # state {sock = Socket,
Tcp_mod = TcpMod}) ->
TcpMod: send (Socket, <<? PT_MSG_SYNC: 8, "sync" >>), State;
BATCH. In fact it is treated as a collection of PT_MSG_OBJ.
Code-Block .. :: erlang
- Process_message (? PT_MSG_BATCH, MsgData, State) ->
Lists: foldl (fun (Obj, StateAcc) -> process_message (? PT_MSG_OBJ, Obj, StateAcc) end,
State, binary_to_term (MsgData));
OBJ. The data received here is sent to gen_fsm 's event as Obj. ...
Code-Block .. :: erlang
Case gen_fsm: sync_send_all_state_event (VNode, Msg, 60000) of
- Ok ->
State # state {count = Count + 1};
- E = {error, _} ->
Exit (E)
End;
Schematic flow
TBD
Riak_core_vnode_proxy
Tip
3/3 追記
Vnode_proxy is a place where all requests to vnode pass. It seems that this proxy will respond, for example, if the vnode can not notify you. (However, the actual code is not pursued)
In riak_core_ring_handler.erl, maybe_start_vnode_proxies (Ring) is called.
handle_event({ring_update, Ring}, State) ->
maybe_start_vnode_proxies(Ring),
maybe_stop_vnode_proxies(Ring),
{ok, State}.
In maybe_start_vnode_proxies, if the destination ring is larger than the current one, we are building a proxy. However, I also stop right after that, I do not understand a bit better.
Riak_core_vnode_worker
Tip
3/3 追記
It is said that vnode_worker is one that enables asynchronous request by activating this worker when vnode performs time-consuming operation and letting worker do it. (However, the actual code is not pursued)