riakのhandoffについて調べたこと¶
source code readingに向けて、riakのhandoffについて詳細を調べることとし ました。
というか、処理の流れをだいたい把握するために、自分のためにまとめたもの ですので、たぶん他の人には役に立たないと思います。
handoffについて¶
ring_updateが呼び出され、ringが更新されるとnodeの再配置が起こりますので、 各vnodeが担当しているringの範囲も変わってくるわけです。それに伴い、各 vnodeが持っている実際のデータを新しい担当vnodeに対して送る必要がありま す。これをhandoffと言います。
処理の流れ¶
riak_core_vnode_manager:schedule_management_timer/0
send_afterでタイマーがセットされ、management_tickごとにhandle_infoが呼び出されます。
schedule_management_timer() -> ManagementTick = app_helper:get_env(riak_core, vnode_management_timer, 10000), erlang:send_after(ManagementTick, ?MODULE, management_tick).
riak_core_vnode_manager:handle_info(management_tick, State0)
一定間隔で呼び出されるこの関数の中でringが変わったかどうかをチェックします。
RingID = riak_core_ring_manager:get_ring_id(), {ok, Ring, CHBin} = riak_core_ring_manager:get_raw_ring_chashbin(), State = maybe_ring_changed(RingID, Ring, CHBin, State0),
maybe_ring_changedの中でring_updateを呼び出したりしています。 その後は、check_repairs/1を呼び出すなどしてringをチェックして、この関数は終わります。
riak_core_vnode_manager:maybe_ring_changed/4
RingIDがlast_ring_idかどうかをチェックし、ringが変わったかどうかを 判定します。違ったらring_changed/3を呼び出します。
case RingID of LastID -> maybe_ensure_vnodes_started(Ring), State; _ -> ensure_vnodes_started(Ring), State2 = ring_changed(Ring, CHBin, State), State2#state{last_ring_id=RingID}
TODO: なぜこれでringが変わったかどうか判断できるのか調べる。
(03/03追記): RingIDとは、Ring全体で一意のID ではなく 各nodeが各自 で持っているvnodeのid。変化があればincrementされている。そのため、 RingIDとLastIDが異なる、ということは、変化が起きたということになる。
riak_core_vnode_manager:ring_changed/3
State2 = update_forwarding(AllVNodes, Mods, Ring, State), %% Update handoff state State3 = update_handoff(AllVNodes, Ring, CHBin, State2), %% Trigger ownership transfers. Transfers = riak_core_ring:pending_changes(Ring), trigger_ownership_handoff(Transfers, Mods, Ring, State3),
forwardの状態やhandoffの状態を更新し、trigger_ownership_handoff/4を 呼び出します。ここでいう状態とは、forwardやhandoffをするべきかどうかなどです。
riak_core_vnode_manager:trigger_ownership_handoff/4
Throttle = limit_ownership_handoff(Transfers, IsResizing), Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle, Mod <- Mods, S =:= awaiting, Node =:= node(), not lists:member(Mod, CMods)], [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
riak_core_vnode_manager:maybe_trigger_handoff/3
riak_core_vnode:trigger_handoff/3を呼び出します。
case dict:find({Mod, Idx}, HO) of {ok, '$resize'} -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), case riak_core_ring:awaiting_resize_transfer(Ring, {Idx, node()}, Mod) of undefined -> ok; {TargetIdx, TargetNode} -> riak_core_vnode:trigger_handoff(Pid, TargetIdx, TargetNode) end; {ok, '$delete'} -> riak_core_vnode:trigger_delete(Pid); {ok, TargetNode} -> riak_core_vnode:trigger_handoff(Pid, TargetNode), ok;
trigger_handoff/3
trigger_handoff eventを呼びだします。
trigger_handoff(VNode, TargetIdx, TargetNode) -> gen_fsm:send_all_state_event(VNode, {trigger_handoff, TargetIdx, TargetNode}).
riak_core_vnode:active({trigger_handoff,
maybe_handoff/3を呼び出します。
active({trigger_handoff, TargetIdx, TargetNode}, State) -> maybe_handoff(TargetIdx, TargetNode, State);
riak_core_vnode:maybe_handoff/3
ResizingとPrimaryからresize_transferやhinted_handoffかどうかを判断してHOTypeを定義。 そのHOTypeを引数にして start_handoff/4を呼び出します。
riak_core_vnode:start_handoff/4
start_outbound/4を呼び出す
riak_core_vnode:start_outbound/4
riak_core_handoff_manager:add_outbound/7 を呼び出します。
case riak_core_handoff_manager:add_outbound(HOType,Mod,Idx,TargetIdx,TargetNode,self(),Opts) of {ok, Pid} -> State#state{handoff_pid=Pid, handoff_type=HOType, handoff_target={TargetIdx, TargetNode}};
riak_core_handoff_manager:add_outbound/7
send_handoff/8を呼び出します。
case send_handoff(Type,{Mod,SrcIdx,TargetIdx},Node,Pid,HS,Opts) of {ok,Handoff=#handoff_status{transport_pid=Sender}} -> HS2 = HS ++ [Handoff], {reply, {ok,Sender}, State#state{handoffs=HS2}};
riak_core_handoff_manager:send_handoff/8
riak_core_handoff_manager:xfer/3やhandle_cast{send_handoffからも呼び出されます。
この中で、他にhandoffが走っていないかどうかをチェックします。(ShouldHandoff=あたり)
そして、handoffを実際に実行していいことが分かると、monitorを建て、senderプロセスを立ち上げます。具体的には、riak_core_handoff_sender_sup:start_senderを呼び出します。 この時、HandoffType(HOType)に応じてfilterなどを変えています。
case ShouldHandoff of true -> VnodeM = monitor(process, Vnode), %% start the sender process BaseOpts = [{src_partition, Src}, {target_partition, Target}], case HOType of repair -> HOFilter = Filter, HOAcc0 = undefined, HONotSentFun = undefined; resize_transfer -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), HOFilter = resize_transfer_filter(Ring, Mod, Src, Target), HOAcc0 = ordsets:new(), HONotSentFun = resize_transfer_notsent_fun(Ring, Mod, Src); _ -> HOFilter = none, HOAcc0 = undefined, HONotSentFun = undefined end, HOOpts = [{filter, HOFilter}, {notsent_acc0, HOAcc0}, {notsent_fun, HONotSentFun} | BaseOpts], {ok, Pid} = riak_core_handoff_sender_sup:start_sender(HOType, Mod, Node, Vnode, HOOpts), PidM = monitor(process, Pid),
riak_core_handoff_sender_sup:start_sender
start_linkでstart_fold/5 を呼び出します。
riak_core_hanoff_sender:start_fold/5
最初にmaybe_call_handoff_started/3でhandoffをしていいか、最後のチェックをします。
次に実際にhandoff先とconnectします。
SockOpts = [binary, {packet, 4}, {header,1}, {active, false}], {Socket, TcpMod} = if SslOpts /= [] -> {ok, Skt} = ssl:connect(TNHandoffIP, Port, SslOpts ++ SockOpts, 15000), {Skt, ssl}; true -> {ok, Skt} = gen_tcp:connect(TNHandoffIP, Port, SockOpts, 15000), {Skt, gen_tcp} end,
メッセージを送ります。
VMaster = list_to_atom(atom_to_list(Module) ++ "_master"), ModBin = atom_to_binary(Module, utf8), Msg = <<?PT_MSG_OLDSYNC:8,ModBin/binary>>, ok = TcpMod:send(Socket, Msg),
transfer先からsyncというメッセージを受け取ります。これを受け取るとい うことは、transfer先が拒否していない、ということです。
case TcpMod:recv(Socket, 0, RecvTimeout) of {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> ok; {error, timeout} -> exit({shutdown, timeout}); {error, closed} -> exit({shutdown, max_concurrency}) end,
receiverがbatching messagesを受け取れるかチェックします。
RemoteSupportsBatching = remote_supports_batching(TargetNode),
PartitionIDを送信し、
M = <<?PT_MSG_INIT:8,TargetPartition:160/integer>>, ok = TcpMod:send(Socket, M),
fold_reqを作成します。ここでvisit_item/3という関数を渡していますが、 このvisit_item/3という関数の中で、実際にデータを送っています。
Req = riak_core_util:make_fold_req( fun visit_item/3, #ho_acc{ack=0,
sync_commandを呼びます。ここで気になるコメントも…
%% IFF the vnode is using an async worker to perform the fold %% then sync_command will return error on vnode crash, %% otherwise it will wait forever but vnode crash will be %% caught by handoff manager. I know, this is confusing, a %% new handoff system will be written soon enough. AccRecord0 = riak_core_vnode_master:sync_command({SrcPartition, SrcNode}, Req, VMaster, infinity),
最後にまだバッファーに残っていたら、send_objects/2を呼び出します。
%% Send any straggler entries remaining in the buffer: AccRecord = send_objects(AccRecord0#ho_acc.item_queue, AccRecord0),
エラーがなかったら、syncというメッセージを送り、返ってくることを確認 します。帰って来たら、転送がうまく行った、ということです。
lager:debug("~p ~p Sending final sync", [SrcPartition, Module]), ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>), case TcpMod:recv(Socket, 0, RecvTimeout) of {ok,[?PT_MSG_SYNC|<<"sync">>]} -> lager:debug("~p ~p Final sync received", [SrcPartition, Module]); {error, timeout} -> exit({shutdown, timeout}) end,
最後に、タイプに応じてresize_transfer_completeかhandoff_completeのイベントを起動して終わりです。
case Type of repair -> ok; resize_transfer -> gen_fsm:send_event(ParentPid, {resize_transfer_complete, NotSentAcc}); _ -> gen_fsm:send_event(ParentPid, handoff_complete) end;
riak_core_handoff_manager:visit_item/3
visit_itemにはsyncで送るものと、asyncで送るものの2つあります。
syncの場合はこんなふうに一個ずつsendしてrecvしています。
case TcpMod:send(Sock, M) of ok -> case TcpMod:recv(Sock, 0, RecvTimeout) of {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> Acc2 = Acc#ho_acc{ack=0, error=ok, stats=Stats3}, visit_item(K, V, Acc2);
そうでない場合、filterを通し、バッチで送るかどうかを判定し、実際に送ります。
case Filter(K) of true -> case Module:encode_handoff_item(K, V) of corrupted -> {Bucket, Key} = K, lager:warning("Unreadable object ~p/~p discarded", [Bucket, Key]), Acc; BinObj -> case UseBatching of true -> %% 略 case ItemQueueByteSize2 =< HandoffBatchThreshold of true -> Acc2#ho_acc{item_queue=ItemQueue2}; false -> send_objects(ItemQueue2, Acc2) end;
UseBatchingの場合、HandoffBatchTrheshold以内かどうかを確認し、以内で あればまとめ、超えていたらsend_objects/2を使って送ります。
riak_core_handoff_manager:send_objects/2
実際にデータを送る関数です。こんな感じですね。
M = <<?PT_MSG_BATCH:8, ObjectList/binary>>, NumBytes = byte_size(M), Stats2 = incr_bytes(incr_objs(Stats, NObjects), NumBytes), Stats3 = maybe_send_status({Module, SrcPartition, TargetPartition}, Stats2), case TcpMod:send(Sock, M) of ok ->
受け取るとき¶
riak_core_handoff_listener:new_connection/2
- gen_nb_server をbehaviorにしているriak_core_handoff_listenerが、
acceptすると、riak_core_handoff_manager:add_inbound/1を呼び出します。
new_connection(Socket, State = #state{ssl_opts = SslOpts}) -> case riak_core_handoff_manager:add_inbound(SslOpts) of {ok, Pid} -> gen_tcp:controlling_process(Socket, Pid), ok = riak_core_handoff_receiver:set_socket(Pid, Socket), {ok, State}; {error, _Reason} -> riak_core_stat:update(rejected_handoffs), gen_tcp:close(Socket), {ok, State}
riak_core_handoff_manager:add_inbound/1
receive_handoff/1を呼び出します。
handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) -> case receive_handoff(SSLOpts) of {ok,Handoff=#handoff_status{transport_pid=Receiver}} -> HS2 = HS ++ [Handoff], {reply, {ok,Receiver}, State#state{handoffs=HS2}}; Error -> {reply, Error, State} end;
riak_core_handoff_manager:receive_handoff/1
handoff_concurrency_limit_reached/0 を呼び出して、並列数を確認した後、riak_core_handoff_receiver_sup:start_receiver を呼び出し、receiver processを起動します。
receive_handoff (SSLOpts) -> case handoff_concurrency_limit_reached() of true -> {error, max_concurrency}; false -> {ok,Pid}=riak_core_handoff_receiver_sup:start_receiver(SSLOpts), PidM = monitor(process, Pid), %% successfully started up a new receiver {ok, #handoff_status{ transport_pid=Pid,
riak_core_handoff_receiver にてmsgに応じた処理を行います。
INIT
process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod, peer=Peer}) -> <<Partition:160/integer>> = MsgData, lager:info("Receiving handoff data for partition ~p:~p from ~p", [VNodeMod, Partition, Peer]), {ok, VNode} = riak_core_vnode_master:get_vnode_pid(Partition, VNodeMod), Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}}, {vnode_pid, VNode}], riak_core_handoff_manager:set_recv_data(self(), Data), State#state{partition=Partition, vnode=VNode};
SYNC
process_message(?PT_MSG_SYNC, _MsgData, State=#state{sock=Socket, tcp_mod=TcpMod}) -> TcpMod:send(Socket, <<?PT_MSG_SYNC:8, "sync">>), State;
BATCH。実際にはPT_MSG_OBJの集合として扱っています。
process_message(?PT_MSG_BATCH, MsgData, State) -> lists:foldl(fun(Obj, StateAcc) -> process_message(?PT_MSG_OBJ, Obj, StateAcc) end, State, binary_to_term(MsgData));
OBJ。ここで受け取ったデータをObjとしてgen_fsmのイベントに送っています。…
process_message(?PT_MSG_OBJ, MsgData, State=#state{vnode=VNode, count=Count}) -> Msg = {handoff_data, MsgData}, case gen_fsm:sync_send_all_state_event(VNode, Msg, 60000) of ok -> State#state{count=Count+1}; E={error, _} -> exit(E) end;
概略フロー¶
TBD
riak_core_vnode_proxy¶
Tip
3/3 追記
vnode_proxyとは、vnodeへのリクエストがすべて通る場所だそうです。もし vnodeが通知ができない場合など、このproxyが応答を行うなどをするそうで す。(ただし実際のコードは追っていません)
riak_core_ring_handler.erlで、 maybe_start_vnode_proxies(Ring) が呼ば れます。
handle_event({ring_update, Ring}, State) ->
maybe_start_vnode_proxies(Ring),
maybe_stop_vnode_proxies(Ring),
{ok, State}.
maybe_start_vnode_proxiesでは、変更先のringが今よりも大きい場合、proxyを建てる、ということをしています。しかし、その直後にstopもしており、ちょっと良く意味がわかりません。
riak_core_vnode_worker¶
Tip
3/3 追記
vnode_workerとはvnodeが時間がかかる操作をするときにこのworkerを立ち 上げて、workerにやらせることにより、非同期でのリクエストを行えるよう にするもの、だそうです。(ただし実際のコードは追っていません)
Comments
comments powered by Disqus