このエントリーをはてなブックマークに追加

riakのhandoffについて調べたこと

source code readingに向けて、riakのhandoffについて詳細を調べることとしました。

というか、処理の流れをだいたい把握するために、自分のためにまとめたものですので、たぶん他の人には役に立たないと思います。

handoffについて

ring_updateが呼び出され、ringが更新されるとnodeの再配置が起こりますので、各vnodeが担当しているringの範囲も変わってくるわけです。それに伴い、各vnodeが持っている実際のデータを新しい担当vnodeに対して送る必要があります。これをhandoffと言います。

処理の流れ

  1. riak_core_vnode_manager:schedule_management_timer/0

    send_afterでタイマーがセットされ、management_tickごとにhandle_infoが呼び出されます。

    schedule_management_timer() ->
       ManagementTick = app_helper:get_env(riak_core,
                                         vnode_management_timer,
                                         10000),
       erlang:send_after(ManagementTick, ?MODULE, management_tick).
    
  2. riak_core_vnode_manager:handle_info(management_tick, State0)

    一定間隔で呼び出されるこの関数の中でringが変わったかどうかをチェックします。

    RingID = riak_core_ring_manager:get_ring_id(),
    {ok, Ring, CHBin} = riak_core_ring_manager:get_raw_ring_chashbin(),
    State = maybe_ring_changed(RingID, Ring, CHBin, State0),
    

    maybe_ring_changedの中でring_updateを呼び出したりしています。その後は、check_repairs/1を呼び出すなどしてringをチェックして、この関数は終わります。

  3. riak_core_vnode_manager:maybe_ring_changed/4

    RingIDがlast_ring_idかどうかをチェックし、ringが変わったかどうかを判定します。違ったらring_changed/3を呼び出します。

    case RingID of
        LastID ->
            maybe_ensure_vnodes_started(Ring),
            State;
        _ ->
            ensure_vnodes_started(Ring),
            State2 = ring_changed(Ring, CHBin, State),
            State2#state{last_ring_id=RingID}
    

    TODO: なぜこれでringが変わったかどうか判断できるのか調べる。

    (03/03追記): RingIDとは、Ring全体で一意のIDではなく各nodeが各自で持っているvnodeのid。変化があればincrementされている。そのため、RingIDとLastIDが異なる、ということは、変化が起きたということになる。

  4. riak_core_vnode_manager:ring_changed/3

    State2 = update_forwarding(AllVNodes, Mods, Ring, State),
    
    %% Update handoff state
    State3 = update_handoff(AllVNodes, Ring, CHBin, State2),
    
    %% Trigger ownership transfers.
    Transfers = riak_core_ring:pending_changes(Ring),
    trigger_ownership_handoff(Transfers, Mods, Ring, State3),
    

    forwardの状態やhandoffの状態を更新し、trigger_ownership_handoff/4を呼び出します。ここでいう状態とは、forwardやhandoffをするべきかどうかなどです。

  5. riak_core_vnode_manager:trigger_ownership_handoff/4

     Throttle = limit_ownership_handoff(Transfers, IsResizing),
     Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
                               Mod <- Mods,
                               S =:= awaiting,
                               Node =:= node(),
                               not lists:member(Mod, CMods)],
    [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
    
  6. riak_core_vnode_manager:maybe_trigger_handoff/3

    riak_core_vnode:trigger_handoff/3を呼び出します。

    case dict:find({Mod, Idx}, HO) of
        {ok, '$resize'} ->
            {ok, Ring} = riak_core_ring_manager:get_my_ring(),
            case riak_core_ring:awaiting_resize_transfer(Ring, {Idx, node()}, Mod) of
                undefined -> ok;
                {TargetIdx, TargetNode} ->
                    riak_core_vnode:trigger_handoff(Pid, TargetIdx, TargetNode)
            end;
        {ok, '$delete'} ->
            riak_core_vnode:trigger_delete(Pid);
        {ok, TargetNode} ->
            riak_core_vnode:trigger_handoff(Pid, TargetNode),
            ok;
    
  7. trigger_handoff/3

    trigger_handoff eventを呼びだします。

    trigger_handoff(VNode, TargetIdx, TargetNode) ->
        gen_fsm:send_all_state_event(VNode, {trigger_handoff, TargetIdx,
        TargetNode}).
    
  8. riak_core_vnode:active({trigger_handoff,

    maybe_handoff/3を呼び出します。

    active({trigger_handoff, TargetIdx, TargetNode}, State) ->
       maybe_handoff(TargetIdx, TargetNode, State);
    
  9. riak_core_vnode:maybe_handoff/3

    ResizingとPrimaryからresize_transferやhinted_handoffかどうかを判断してHOTypeを定義。そのHOTypeを引数にして start_handoff/4を呼び出します。

  10. riak_core_vnode:start_handoff/4

    start_outbound/4を呼び出す

  11. riak_core_vnode:start_outbound/4

    riak_core_handoff_manager:add_outbound/7 を呼び出します。

    case riak_core_handoff_manager:add_outbound(HOType,Mod,Idx,TargetIdx,TargetNode,self(),Opts) of
      {ok, Pid} ->
          State#state{handoff_pid=Pid,
                      handoff_type=HOType,
                      handoff_target={TargetIdx, TargetNode}};
    
  12. riak_core_handoff_manager:add_outbound/7

    send_handoff/8を呼び出します。

    case send_handoff(Type,{Mod,SrcIdx,TargetIdx},Node,Pid,HS,Opts) of
        {ok,Handoff=#handoff_status{transport_pid=Sender}} ->
            HS2 = HS ++ [Handoff],
            {reply, {ok,Sender}, State#state{handoffs=HS2}};
    
  13. riak_core_handoff_manager:send_handoff/8

    riak_core_handoff_manager:xfer/3やhandle_cast{send_handoffからも呼び出されます。

    この中で、他にhandoffが走っていないかどうかをチェックします。(ShouldHandoff=あたり)

    そして、handoffを実際に実行していいことが分かると、monitorを建て、senderプロセスを立ち上げます。具体的には、riak_core_handoff_sender_sup:start_senderを呼び出します。この時、HandoffType(HOType)に応じてfilterなどを変えています。

    case ShouldHandoff of
        true ->
            VnodeM = monitor(process, Vnode),
            %% start the sender process
            BaseOpts = [{src_partition, Src}, {target_partition, Target}],
            case HOType of
                repair ->
                    HOFilter = Filter,
                    HOAcc0 = undefined,
                    HONotSentFun = undefined;
                resize_transfer ->
                    {ok, Ring} = riak_core_ring_manager:get_my_ring(),
                    HOFilter = resize_transfer_filter(Ring, Mod, Src, Target),
                    HOAcc0 = ordsets:new(),
                    HONotSentFun = resize_transfer_notsent_fun(Ring, Mod, Src);
                _ ->
                    HOFilter = none,
                    HOAcc0 = undefined,
                    HONotSentFun = undefined
            end,
            HOOpts = [{filter, HOFilter},
                      {notsent_acc0, HOAcc0},
                      {notsent_fun, HONotSentFun} | BaseOpts],
            {ok, Pid} = riak_core_handoff_sender_sup:start_sender(HOType,
                                                                  Mod,
                                                                  Node,
                                                                  Vnode,
                                                                  HOOpts),
            PidM = monitor(process, Pid),
    
  14. riak_core_handoff_sender_sup:start_sender

    start_linkでstart_fold/5 を呼び出します。

  15. riak_core_hanoff_sender:start_fold/5

    最初にmaybe_call_handoff_started/3でhandoffをしていいか、最後のチェックをします。

    次に実際にhandoff先とconnectします。

    SockOpts = [binary, {packet, 4}, {header,1}, {active, false}],
    {Socket, TcpMod} =
        if SslOpts /= [] ->
                {ok, Skt} = ssl:connect(TNHandoffIP, Port, SslOpts ++ SockOpts,
                                        15000),
                {Skt, ssl};
           true ->
                {ok, Skt} = gen_tcp:connect(TNHandoffIP, Port, SockOpts, 15000),
                {Skt, gen_tcp}
        end,
    

    メッセージを送ります。

    VMaster = list_to_atom(atom_to_list(Module) ++ "_master"),
    ModBin = atom_to_binary(Module, utf8),
    Msg = <<?PT_MSG_OLDSYNC:8,ModBin/binary>>,
    ok = TcpMod:send(Socket, Msg),
    

    transfer先からsyncというメッセージを受け取ります。これを受け取るということは、transfer先が拒否していない、ということです。

    case TcpMod:recv(Socket, 0, RecvTimeout) of
        {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> ok;
        {error, timeout} -> exit({shutdown, timeout});
        {error, closed} -> exit({shutdown, max_concurrency})
    end,
    

    receiverがbatching messagesを受け取れるかチェックします。

    RemoteSupportsBatching = remote_supports_batching(TargetNode),
    

    PartitionIDを送信し、

    M = <<?PT_MSG_INIT:8,TargetPartition:160/integer>>,
    ok = TcpMod:send(Socket, M),
    

    fold_reqを作成します。ここでvisit_item/3という関数を渡していますが、このvisit_item/3という関数の中で、実際にデータを送っています。

    Req = riak_core_util:make_fold_req(
                        fun visit_item/3,
                        #ho_acc{ack=0,
    

    sync_commandを呼びます。ここで気になるコメントも…

    %% IFF the vnode is using an async worker to perform the fold
    %% then sync_command will return error on vnode crash,
    %% otherwise it will wait forever but vnode crash will be
    %% caught by handoff manager.  I know, this is confusing, a
    %% new handoff system will be written soon enough.
    
    AccRecord0 =
    riak_core_vnode_master:sync_command({SrcPartition, SrcNode},
                                                     Req,
                                                     VMaster,
                                                     infinity),
    

    最後にまだバッファーに残っていたら、send_objects/2を呼び出します。

    %% Send any straggler entries remaining in the buffer:
    AccRecord = send_objects(AccRecord0#ho_acc.item_queue, AccRecord0),
    

    エラーがなかったら、syncというメッセージを送り、返ってくることを確認します。帰って来たら、転送がうまく行った、ということです。

    lager:debug("~p ~p Sending final sync",
                [SrcPartition, Module]),
    ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
    
    case TcpMod:recv(Socket, 0, RecvTimeout) of
        {ok,[?PT_MSG_SYNC|<<"sync">>]} ->
            lager:debug("~p ~p Final sync received",
                        [SrcPartition, Module]);
        {error, timeout} -> exit({shutdown, timeout})
    end,
    

    最後に、タイプに応じてresize_transfer_completeかhandoff_completeのイベントを起動して終わりです。

        case Type of
        repair -> ok;
        resize_transfer -> gen_fsm:send_event(ParentPid,
        {resize_transfer_complete,
                                                          NotSentAcc});
        _ -> gen_fsm:send_event(ParentPid,
        handoff_complete)
    end;
    
  16. riak_core_handoff_manager:visit_item/3

    visit_itemにはsyncで送るものと、asyncで送るものの2つあります。

    syncの場合はこんなふうに一個ずつsendしてrecvしています。

    case TcpMod:send(Sock, M) of
        ok ->
            case TcpMod:recv(Sock, 0, RecvTimeout) of
                {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} ->
                    Acc2 = Acc#ho_acc{ack=0, error=ok, stats=Stats3},
                    visit_item(K, V, Acc2);
    

    そうでない場合、filterを通し、バッチで送るかどうかを判定し、実際に送ります。

    case Filter(K) of
        true ->
            case Module:encode_handoff_item(K, V) of
                corrupted ->
                    {Bucket, Key} = K,
                    lager:warning("Unreadable object ~p/~p discarded",
                                  [Bucket, Key]),
                    Acc;
                BinObj ->
                    case UseBatching of
                        true ->
                         %% 略
                         case ItemQueueByteSize2 =< HandoffBatchThreshold of
                             true  ->
                             Acc2#ho_acc{item_queue=ItemQueue2};
                             false -> send_objects(ItemQueue2,
                             Acc2)
                         end;
    

    UseBatchingの場合、HandoffBatchTrheshold以内かどうかを確認し、以内であればまとめ、超えていたらsend_objects/2を使って送ります。

  17. riak_core_handoff_manager:send_objects/2

    実際にデータを送る関数です。こんな感じですね。

    M = <<?PT_MSG_BATCH:8, ObjectList/binary>>,
    
    NumBytes = byte_size(M),
    
    Stats2 = incr_bytes(incr_objs(Stats, NObjects), NumBytes),
    Stats3 = maybe_send_status({Module, SrcPartition,
    TargetPartition}, Stats2),
    
    case TcpMod:send(Sock, M) of
        ok ->
    

受け取るとき

  1. riak_core_handoff_listener:new_connection/2

    gen_nb_server をbehaviorにしているriak_core_handoff_listenerが、

    acceptすると、riak_core_handoff_manager:add_inbound/1を呼び出します。

    new_connection(Socket, State = #state{ssl_opts = SslOpts}) ->
     case riak_core_handoff_manager:add_inbound(SslOpts) of
         {ok, Pid} ->
             gen_tcp:controlling_process(Socket, Pid),
             ok = riak_core_handoff_receiver:set_socket(Pid, Socket),
             {ok, State};
         {error, _Reason} ->
             riak_core_stat:update(rejected_handoffs),
             gen_tcp:close(Socket),
             {ok, State}
    
  2. riak_core_handoff_manager:add_inbound/1

    receive_handoff/1を呼び出します。

    handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) ->
     case receive_handoff(SSLOpts) of
         {ok,Handoff=#handoff_status{transport_pid=Receiver}} ->
             HS2 = HS ++ [Handoff],
             {reply, {ok,Receiver}, State#state{handoffs=HS2}};
         Error ->
             {reply, Error, State}
     end;
    
  3. riak_core_handoff_manager:receive_handoff/1

    handoff_concurrency_limit_reached/0 を呼び出して、並列数を確認した後、riak_core_handoff_receiver_sup:start_receiver を呼び出し、receiver processを起動します。

    receive_handoff (SSLOpts) ->
       case handoff_concurrency_limit_reached() of
           true ->
               {error, max_concurrency};
           false ->
               {ok,Pid}=riak_core_handoff_receiver_sup:start_receiver(SSLOpts),
               PidM = monitor(process, Pid),
    
               %% successfully started up a new receiver
               {ok, #handoff_status{ transport_pid=Pid,
    
  4. riak_core_handoff_receiverにてmsgに応じた処理を行います。

    INIT

    process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod,
                                                    peer=Peer}) ->
    <<Partition:160/integer>> = MsgData,
    lager:info("Receiving handoff data for partition ~p:~p from ~p", [VNodeMod, Partition, Peer]),
    {ok, VNode} = riak_core_vnode_master:get_vnode_pid(Partition, VNodeMod),
    Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}},
            {vnode_pid, VNode}],
    riak_core_handoff_manager:set_recv_data(self(), Data),
    State#state{partition=Partition, vnode=VNode};
    

    SYNC

    process_message(?PT_MSG_SYNC, _MsgData, State=#state{sock=Socket,
                                                     tcp_mod=TcpMod}) ->
        TcpMod:send(Socket, <<?PT_MSG_SYNC:8, "sync">>),
        State;
    

    BATCH。実際にはPT_MSG_OBJの集合として扱っています。

    process_message(?PT_MSG_BATCH, MsgData, State) ->
        lists:foldl(fun(Obj, StateAcc) -> process_message(?PT_MSG_OBJ, Obj, StateAcc) end,
                    State,
                    binary_to_term(MsgData));
    

    OBJ。ここで受け取ったデータをObjとしてgen_fsmのイベントに送っています。…

    process_message(?PT_MSG_OBJ, MsgData, State=#state{vnode=VNode, count=Count}) ->
        Msg = {handoff_data, MsgData},
        case gen_fsm:sync_send_all_state_event(VNode, Msg, 60000) of
            ok ->
                State#state{count=Count+1};
            E={error, _} ->
                exit(E)
        end;
    

概略フロー

TBD

riak_core_vnode_proxy

ちなみに

3/3 追記

vnode_proxyとは、vnodeへのリクエストがすべて通る場所だそうです。もしvnodeが通知ができない場合など、このproxyが応答を行うなどをするそうです。(ただし実際のコードは追っていません)

riak_core_ring_handler.erlで、 maybe_start_vnode_proxies(Ring) が呼ばれます。

handle_event({ring_update, Ring}, State) ->
    maybe_start_vnode_proxies(Ring),
    maybe_stop_vnode_proxies(Ring),
    {ok, State}.

maybe_start_vnode_proxiesでは、変更先のringが今よりも大きい場合、proxyを建てる、ということをしています。しかし、その直後にstopもしており、ちょっと良く意味がわかりません。

riak_core_vnode_worker

ちなみに

3/3 追記

vnode_workerとはvnodeが時間がかかる操作をするときにこのworkerを立ち上げて、workerにやらせることにより、非同期でのリクエストを行えるようにするもの、だそうです。(ただし実際のコードは追っていません)