diff --git a/guides/configuration.md b/guides/configuration.md index 2cbb0058..de9313b6 100644 --- a/guides/configuration.md +++ b/guides/configuration.md @@ -9,11 +9,9 @@ Amoc supports the following generic configuration parameters: * default value - empty list (`[]`) * example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"` - -* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes - for two consecutive users: - * default value - 50 ms. - * example: `AMOC_INTERARRIVAL="50"` +* `user_rate` - a rate (implemented as a `t:amoc_throttle:rate/0`) of user processes per minute: + * default value - 1200. + * example: `AMOC_USER_RATE="600"` * this parameter can be updated at runtime (in the same way as scenario configuration). * `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface diff --git a/src/amoc.erl b/src/amoc.erl index b3d39b99..11d50195 100644 --- a/src/amoc.erl +++ b/src/amoc.erl @@ -25,7 +25,7 @@ do(Scenario, Count, Settings) -> case {amoc_controller:start_scenario(Scenario, Settings), Count} of {ok, 0} -> ok; {ok, Count} -> amoc_controller:add_users(1, Count); - Error -> Error + {Error, _} -> Error end; Error -> Error end. diff --git a/src/amoc_controller.erl b/src/amoc_controller.erl index 4fe35ef8..1ab10eef 100644 --- a/src/amoc_controller.erl +++ b/src/amoc_controller.erl @@ -8,28 +8,31 @@ -behaviour(gen_server). -define(SERVER, ?MODULE). +-define(DEFAULT_USER_RATE, 1200). --required_variable(#{name => interarrival, default_value => 50, - verification => {?MODULE, non_neg_integer, 1}, - description => "a delay between creating the processes for two " - "consecutive users (ms, def: 50ms)", - update => {?MODULE, maybe_update_interarrival_timer, 2}}). +-required_variable(#{name => user_rate, default_value => ?DEFAULT_USER_RATE, + verification => {?MODULE, verify_user_rate, 1}, + description => "Throttle rate for the Scenario:start/1,2 callback", + update => {?MODULE, update_user_rate, 2}}). -record(state, {scenario :: amoc:scenario() | undefined, + status = idle :: status(), last_user_id = 0 :: last_user_id(), - status = idle :: idle | running | terminating | finished | - {error, any()} | disabled, - scenario_state :: any(), %% state returned from Scenario:init/0 - create_users = [] :: [amoc_scenario:user_id()], - tref :: timer:tref() | undefined}). + scenario_state :: amoc_scenario:state() %% state returned from Scenario:init/0 + }). + +-type status() :: idle | running | terminating | finished | {error, any()} | disabled. +%% Scenario status. -type state() :: #state{}. %% Internal state of the node's controller + -type handle_call_res() :: ok | {ok, term()} | {error, term()}. -type running_status() :: #{scenario := amoc:scenario(), currently_running_users := user_count(), highest_user_id := last_user_id()}. %% Details about the scenario currently running + -type amoc_status() :: idle | {running, running_status()} | {terminating, amoc:scenario()} | @@ -37,11 +40,14 @@ {error, any()} | disabled. %% Status of the node, note that amoc_controller is disabled for the master node + -type user_count() :: non_neg_integer(). %% Number of users currently running in the node + -type last_user_id() :: non_neg_integer(). %% Highest user id registered in the node --type interarrival() :: non_neg_integer(). + +-type user_rate() :: amoc_throttle:rate(). %% Time to wait in between spawning new users %% ------------------------------------------------------------------ @@ -65,14 +71,14 @@ %% ------------------------------------------------------------------ %% Parameters verification functions %% ------------------------------------------------------------------ --export([maybe_update_interarrival_timer/2, non_neg_integer/1]). +-export([update_user_rate/2, verify_user_rate/1]). --export([zero_users_running/0]). +-export([wait_user_rate/0, zero_users_running/0]). %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ --export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% ------------------------------------------------------------------ %% API Function Definitions @@ -122,14 +128,14 @@ disable() -> gen_server:call(?SERVER, disable). %% @private --spec non_neg_integer(any()) -> boolean(). -non_neg_integer(Interarrival) -> - is_integer(Interarrival) andalso Interarrival >= 0. +-spec verify_user_rate(any()) -> boolean(). +verify_user_rate(UserRate) -> + (is_integer(UserRate) andalso (0 =< UserRate)) orelse (infinity =:= UserRate). %% @private --spec maybe_update_interarrival_timer(interarrival, term()) -> ok. -maybe_update_interarrival_timer(interarrival, _) -> - gen_server:cast(?SERVER, maybe_update_interarrival_timer). +-spec update_user_rate(user_rate, user_rate()) -> ok. +update_user_rate(user_rate, UserRate) -> + ok = amoc_throttle:change_rate(user_rate, #{rate => UserRate}). %% @private -spec zero_users_running() -> ok. @@ -180,8 +186,6 @@ handle_call(_Request, _From, State) -> %% @private -spec handle_cast(any(), state()) -> {noreply, state()}. -handle_cast(maybe_update_interarrival_timer, State) -> - {noreply, maybe_update_interarrival_timer(State)}; handle_cast(zero_users_running, State) -> NewSate = handle_zero_users_running(State), {noreply, NewSate}; @@ -190,15 +194,14 @@ handle_cast(_Msg, State) -> %% @private -spec handle_info(any(), state()) -> {noreply, state()}. -handle_info(start_user, State) -> - NewSate = handle_start_user(State), - {noreply, NewSate}; -handle_info(start_all_users, State) -> - NewSate = handle_start_all_users(State), - {noreply, NewSate}; handle_info(_Msg, State) -> {noreply, State}. +%% @private +-spec terminate(term(), state()) -> any(). +terminate(_Reason, _State) -> + amoc_users_sup:terminate_all_children(). + %% ------------------------------------------------------------------ %% internal functions %% ------------------------------------------------------------------ @@ -243,18 +246,15 @@ handle_update_settings(_Settings, #state{status = Status}) -> -spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) -> {handle_call_res(), state()}. -handle_add(StartId, EndId, #state{last_user_id = LastId, - create_users = ScheduledUsers, - status = running, +handle_add(StartId, EndId, #state{status = running, + last_user_id = LastId, scenario = Scenario, - tref = TRef} = State) when StartId =< EndId, - LastId < StartId -> + scenario_state = ScenarioState} = State) + when StartId =< EndId, LastId < StartId -> amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1}, #{scenario => Scenario, type => add}), - NewUsers = lists:seq(StartId, EndId), - NewScheduledUsers = lists:append(ScheduledUsers, NewUsers), - NewTRef = maybe_start_timer(TRef), - {ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}}; + amoc_users_sup:start_children(Scenario, StartId, EndId, ScenarioState), + {ok, State#state{last_user_id = EndId}}; handle_add(_StartId, _EndId, #state{status = running} = State) -> {{error, invalid_range}, State}; handle_add(_StartId, _EndId, #state{status = Status} = State) -> @@ -287,23 +287,6 @@ handle_disable(#state{status = idle} = State) -> handle_disable(#state{status = Status} = State) -> {{error, {invalid_status, Status}}, State}. --spec handle_start_user(state()) -> state(). -handle_start_user(#state{create_users = [UserId | T], - scenario = Scenario, - scenario_state = ScenarioState} = State) -> - amoc_users_sup:start_child(Scenario, UserId, ScenarioState), - State#state{create_users = T}; -handle_start_user(#state{create_users = [], tref = TRef} = State) -> - State#state{tref = maybe_stop_timer(TRef)}. - --spec handle_start_all_users(state()) -> state(). -handle_start_all_users(#state{create_users = AllUsers, - scenario = Scenario, - scenario_state = ScenarioState, - tref = TRef} = State) -> - amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState), - State#state{create_users = [], tref = maybe_stop_timer(TRef)}. - %% ------------------------------------------------------------------ %% helpers %% ------------------------------------------------------------------ @@ -316,12 +299,15 @@ start_tables() -> %% ETS creation {ok | error, any()}. init_scenario(Scenario, Settings) -> case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of - ok -> amoc_scenario:init(Scenario); + ok -> + start_user_rate(), + amoc_scenario:init(Scenario); {error, Type, Reason} -> {error, {Type, Reason}} end. -spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}. terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) -> + stop_user_rate(), amoc_scenario:terminate(Scenario, ScenarioState). -spec handle_zero_users_running(state()) -> state(). @@ -331,35 +317,20 @@ handle_zero_users_running(#state{status = terminating} = State) -> handle_zero_users_running(State) -> State. --spec maybe_stop_timer(timer:tref() | undefined) -> undefined. -maybe_stop_timer(undefined) -> - undefined; -maybe_stop_timer(TRef) -> - {ok, cancel} = timer:cancel(TRef), - undefined. - --spec get_interarrival() -> interarrival(). -get_interarrival() -> - amoc_config:get(interarrival). - --spec maybe_update_interarrival_timer(state()) -> state(). -maybe_update_interarrival_timer(#state{tref = undefined} = State) -> - State; -maybe_update_interarrival_timer(#state{tref = TRef} = State) -> - {ok, cancel} = timer:cancel(TRef), - Value = get_interarrival(), - NewTRef = do_interarrival(Value), - State#state{tref = NewTRef}. - --spec maybe_start_timer(timer:tref() | undefined) -> timer:tref(). -maybe_start_timer(undefined) -> - Value = get_interarrival(), - do_interarrival(Value); -maybe_start_timer(TRef) -> TRef. - -do_interarrival(0) -> - self() ! start_all_users, - undefined; -do_interarrival(Value) -> - {ok, NewTRef} = timer:send_interval(Value, start_user), - NewTRef. +-spec get_user_rate() -> user_rate(). +get_user_rate() -> + amoc_config:get(user_rate, ?DEFAULT_USER_RATE). + +-spec wait_user_rate() -> boolean(). +wait_user_rate() -> + infinity =:= get_user_rate() + orelse ok =:= amoc_throttle:wait(user_rate). + +-spec start_user_rate() -> any(). +start_user_rate() -> + UserRate = get_user_rate(), + amoc_throttle:start(user_rate, #{rate => UserRate}). + +-spec stop_user_rate() -> any(). +stop_user_rate() -> + amoc_throttle:stop(user_rate). diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index 0c4e42fc..7708f18b 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -63,7 +63,7 @@ plan := plan()}. %% Gradual plan details. Must specify a `t:gradual/0', and a `t:plan/0'. --export_type([t/0, name/0, rate/0, interval/0, gradual_plan/0]). +-export_type([t/0, name/0, rate/0, interval/0, interarrival/0, gradual_plan/0]). %% @doc Starts the throttle mechanism for a given `Name' with a given config. %% diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index ef12e0a9..ae8a00eb 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -267,6 +267,7 @@ continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate | Rates]} = Pla NewPlan = Plan#change_rate_plan{rates = Rates}, State#{Name => Info1#throttle_info{change_plan = NewPlan}}. +-spec consume_all_timer_ticks(any()) -> ok. consume_all_timer_ticks(Msg) -> receive Msg -> consume_all_timer_ticks(Msg) diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index a0a21025..23d52c83 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -145,17 +145,18 @@ maybe_stop_timer(#state{tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), consume_all_timer_ticks(delay_between_executions). -timeout(#state{delay_between_executions = infinity}) -> - infinity; -timeout(#state{delay_between_executions = Delay}) -> - Delay + ?DEFAULT_MSG_TIMEOUT. - +-spec consume_all_timer_ticks(any()) -> ok. consume_all_timer_ticks(Msg) -> receive Msg -> consume_all_timer_ticks(Msg) after 0 -> ok end. +timeout(#state{delay_between_executions = infinity}) -> + infinity; +timeout(#state{delay_between_executions = Delay}) -> + Delay + ?DEFAULT_MSG_TIMEOUT. + maybe_run_fn(#state{schedule = [], schedule_reversed = []} = State) -> State; maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) -> diff --git a/src/users/amoc_user.erl b/src/users/amoc_user.erl index 663b750a..03a61bc6 100644 --- a/src/users/amoc_user.erl +++ b/src/users/amoc_user.erl @@ -3,17 +3,11 @@ -module(amoc_user). %% API --export([start_link/3]). -export([stop/0, stop/2]). -export([init/4]). -type state() :: term(). --spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) -> - {ok, pid()} | {error, term()}. -start_link(Scenario, Id, State) -> - proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]). - -spec stop() -> ok. stop() -> stop(self(), false). @@ -24,6 +18,7 @@ stop(Pid, Force) when is_pid(Pid) -> -spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term(). init(Parent, Scenario, Id, State) -> - proc_lib:init_ack(Parent, {ok, self()}), + amoc_controller:wait_user_rate(), + amoc_users_worker_sup:user_up(Parent, Id), process_flag(trap_exit, true), amoc_scenario:start(Scenario, Id, State). diff --git a/src/users/amoc_users_sup.erl b/src/users/amoc_users_sup.erl index c513d8d7..51ebe369 100644 --- a/src/users/amoc_users_sup.erl +++ b/src/users/amoc_users_sup.erl @@ -4,7 +4,7 @@ %% %% It spawns a pool of workers as big as online schedulers. When starting a new user, as the user is %% identified by ID, a worker will be chosen for this user based on its ID -%% (see get_sup_for_user_id/1). +%% (see `get_sup_from_user_id/1'). %% %% The currently running number of users is stored in an atomic that all workers update and the %% controller can read. @@ -16,9 +16,8 @@ -export([start_link/0, init/1]). %% API --export([incr_no_of_users/1, decr_no_of_users/1, count_no_of_users/0, - start_child/3, stop_child/2, start_children/3, stop_children/2, terminate_all_children/0]). - +-export([handle_up_user/3, handle_down_user/2, count_no_of_users/0]). +-export([start_children/4, stop_child/2, stop_children/2, terminate_all_children/0]). -export([distribute/2, get_all_children/0]). -type count() :: non_neg_integer(). @@ -34,6 +33,8 @@ sups_count :: pos_integer() }). +-define(TABLE, amoc_users_sup_table). + %% Supervisor %% @private @@ -41,7 +42,7 @@ start_link() -> Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, no_args), UserSups = supervisor:which_children(?MODULE), - IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups], + IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups ], IndexedSups = lists:keysort(2, IndexedSupsUnsorted), UserSupPidsTuple = list_to_tuple([ Pid || {Pid, _} <- IndexedSups ]), SupCount = tuple_size(UserSupPidsTuple), @@ -54,6 +55,9 @@ start_link() -> %% @private -spec init(no_args) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init(no_args) -> + EtsOpts = [ordered_set, public, named_table, + {read_concurrency, true}, {write_concurrency, auto}], + _Table = ets:new(?TABLE, EtsOpts), Specs = [ #{ id => {amoc_users_worker_sup, N}, @@ -67,6 +71,7 @@ init(no_args) -> Strategy = #{strategy => one_for_one, intensity => 0}, {ok, {Strategy, Specs}}. +%% We start from 2 to simplify user_count atomics management. indexes() -> lists:seq(2, erlang:system_info(schedulers_online) + 1). @@ -76,14 +81,16 @@ count_no_of_users() -> #storage{user_count = Atomics} = persistent_term:get(?MODULE), atomics:get(Atomics, 1). --spec incr_no_of_users(non_neg_integer()) -> any(). -incr_no_of_users(SupNum) when SupNum > 1 -> +-spec handle_up_user(non_neg_integer(), pid(), amoc_scenario:user_id()) -> any(). +handle_up_user(SupNum, Pid, Id) when SupNum > 1 -> + ets:insert(?TABLE, {Pid, Id}), #storage{user_count = Atomics} = persistent_term:get(?MODULE), atomics:add(Atomics, SupNum, 1), atomics:add(Atomics, 1, 1). --spec decr_no_of_users(non_neg_integer()) -> ok. -decr_no_of_users(SupNum) when SupNum > 1 -> +-spec handle_down_user(non_neg_integer(), pid()) -> ok. +handle_down_user(SupNum, Pid) when SupNum > 1 -> + ets:delete(?TABLE, Pid), #storage{user_count = Atomics} = persistent_term:get(?MODULE), atomics:sub(Atomics, SupNum, 1), case atomics:sub_get(Atomics, 1, 1) of @@ -93,24 +100,24 @@ decr_no_of_users(SupNum) when SupNum > 1 -> ok end. --spec start_child(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok. -start_child(Scenario, Id, ScenarioState) -> - Sup = get_sup_for_user_id(Id), - amoc_users_worker_sup:start_child(Sup, Scenario, Id, ScenarioState). - -spec stop_child(pid(), boolean()) -> ok. stop_child(Pid, Force) -> - amoc_users_worker_sup:stop_child(Pid, Force). + case ets:lookup(?TABLE, Pid) of + [Object] -> + Sup = get_sup_from_user_id(Object), + amoc_users_worker_sup:stop_children(Sup, [Pid], Force); + _ -> + ok + end. %% Group all children based on ID to their respective worker supervisor and cast a request with each %% group at once. This way we reduce the number of casts to each worker to always one, instead of %% depending on the number of users. --spec start_children(amoc:scenario(), [amoc_scenario:user_id()], any()) -> ok. -start_children(Scenario, UserIds, ScenarioState) -> - State = persistent_term:get(?MODULE), - #storage{sups = Supervisors, sups_indexed = IndexedSups, sups_count = SupCount} = State, - Acc = maps:from_list([ {Sup, []} || {Sup, _} <- IndexedSups ]), - Assignments = assign_users_to_sups(SupCount, Supervisors, UserIds, Acc), +-spec start_children(amoc:scenario(), amoc_scenario:user_id(), amoc_scenario:user_id(), any()) -> + ok. +start_children(Scenario, StartId, EndId, ScenarioState) -> + UserIds = lists:seq(StartId, EndId), + Assignments = maps:groups_from_list(fun get_sup_from_user_id/1, UserIds), CastFun = fun(Sup, Users) -> amoc_users_worker_sup:start_children(Sup, Scenario, Users, ScenarioState) end, @@ -120,47 +127,53 @@ start_children(Scenario, UserIds, ScenarioState) -> %% in order to load-balance the request among all workers. -spec stop_children(non_neg_integer(), boolean()) -> non_neg_integer(). stop_children(Count, Force) -> - {CountRemove, Assignments} = assign_counts(Count), - [ amoc_users_worker_sup:stop_children(Sup, Int, Force) || {Sup, Int} <- Assignments ], - CountRemove. + Users = case ets:match_object(?TABLE, '$1', Count) of + '$end_of_table' -> + []; + {Objects, _} -> + Objects + end, + stop_children_assignments(Users, Force), + length(Users). -spec get_all_children() -> [{pid(), amoc_scenario:user_id()}]. get_all_children() -> - #storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE), - All = [ amoc_users_worker_sup:get_all_children(Sup) || {Sup, _} <- IndexedSups ], - lists:flatten(All). + ets:tab2list(?TABLE). -spec terminate_all_children() -> any(). terminate_all_children() -> - #storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE), - [ amoc_users_worker_sup:terminate_all_children(Sup) || {Sup, _} <- IndexedSups ]. + Match = ets:match_object(?TABLE, '$1', 500), + do_terminate_all_my_children(Match). + +-spec stop_children_assignments([{pid(), amoc_scenario:user_id()}], boolean()) -> ok. +stop_children_assignments(Users, Force) -> + Assignments = maps:groups_from_list(fun get_sup_from_user_id/1, fun get_pid/1, Users), + CastFun = fun(Sup, Assignment) -> + amoc_users_worker_sup:stop_children(Sup, Assignment, Force) + end, + maps:foreach(CastFun, Assignments). + +%% ets:continuation/0 type is unfortunately not exported from the ets module. +-spec do_terminate_all_my_children({[tuple()], term()} | '$end_of_table') -> ok. +do_terminate_all_my_children({Users, Continuation}) -> + stop_children_assignments(Users, true), + Match = ets:match_object(Continuation), + do_terminate_all_my_children(Match); +do_terminate_all_my_children('$end_of_table') -> + ok. %% Helpers --spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid(). -get_sup_for_user_id(Id) -> +-spec get_sup_from_user_id({pid(), amoc_scenario:user_id()} | amoc_scenario:user_id()) -> pid(). +get_sup_from_user_id({_Pid, Id}) -> + get_sup_from_user_id(Id); +get_sup_from_user_id(Id) -> #storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE), Index = erlang:phash2(Id, SupCount) + 1, element(Index, Supervisors). -%% assign which users each worker will be requested to add --spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) -> - Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}. -assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) -> - Index = erlang:phash2(Id, SupCount) + 1, - ChosenSup = element(Index, Supervisors), - Vs = maps:get(ChosenSup, Acc), - NewAcc = Acc#{ChosenSup := [Id | Vs]}, - assign_users_to_sups(SupCount, Supervisors, Ids, NewAcc); -assign_users_to_sups(_, _, [], Acc) -> - Acc. - -%% assign how many users each worker will be requested to remove, -%% taking care of the fact that worker might not have enough users. --spec assign_counts(count()) -> {count(), assignment()}. -assign_counts(Total) -> - #storage{user_count = Atomics, sups_indexed = Indexed} = persistent_term:get(?MODULE), - SupervisorsWithCounts = [ {Sup, atomics:get(Atomics, SupPos)} || {Sup, SupPos} <- Indexed ], - distribute(Total, SupervisorsWithCounts). +-spec get_pid({pid(), amoc_scenario:user_id()}) -> pid(). +get_pid({Pid, _}) -> + Pid. -spec distribute(count(), assignment()) -> {count(), assignment()}. distribute(Total, SupervisorsWithCounts) -> diff --git a/src/users/amoc_users_worker_sup.erl b/src/users/amoc_users_worker_sup.erl index 03e050a4..c12c13eb 100644 --- a/src/users/amoc_users_worker_sup.erl +++ b/src/users/amoc_users_worker_sup.erl @@ -17,13 +17,11 @@ -export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export([start_child/4, stop_child/2, start_children/4, stop_children/3, terminate_all_children/1]). - --export([get_all_children/1]). +-export([start_children/4, user_up/2, stop_children/3]). -record(state, { index :: non_neg_integer(), - table :: ets:table(), + waiting = #{} :: #{pid() := reference()}, tasks = #{} :: #{reference() := pid()} }). -type state() :: #state{}. @@ -35,154 +33,115 @@ start_link(N) -> gen_server:start_link(?MODULE, N, []). --spec start_child(pid(), amoc:scenario(), amoc_scenario:user_id(), any()) -> ok. -start_child(Sup, Scenario, Id, ScenarioState) -> - gen_server:cast(Sup, {start_child, Scenario, Id, ScenarioState}). +-spec user_up(pid(), amoc_scenario:user_id()) -> any(). +user_up(Sup, Id) -> + erlang:send(Sup, {user_up, self(), Id}). -spec start_children(pid(), amoc:scenario(), [amoc_scenario:user_id()], any()) -> ok. start_children(Sup, Scenario, UserIds, ScenarioState) -> gen_server:cast(Sup, {start_children, Scenario, UserIds, ScenarioState}). --spec stop_children(pid(), non_neg_integer(), boolean()) -> ok. -stop_children(Sup, Count, Force) -> - gen_server:cast(Sup, {stop_children, Count, Force}). - --spec terminate_all_children(pid()) -> any(). -terminate_all_children(Sup) -> - gen_server:cast(Sup, terminate_all_children). - --spec stop_child(pid(), boolean()) -> ok. -stop_child(Pid, false) -> - exit(Pid, shutdown), - ok; -stop_child(Pid, true) -> - spawn(shutdown_and_kill_after_timeout_fun(Pid)), - ok. - --spec get_all_children(pid()) -> [{pid(), amoc_scenario:user_id()}]. -get_all_children(Sup) -> - gen_server:call(Sup, get_all_children, infinity). +-spec stop_children(pid(), [pid()], boolean()) -> ok. +stop_children(Sup, Pids, Force) -> + gen_server:cast(Sup, {stop_children, Pids, Force}). %% @private -spec init(non_neg_integer()) -> {ok, state()}. init(N) -> process_flag(trap_exit, true), - Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(N)), - Table = ets:new(Name, [ordered_set, protected, named_table]), - {ok, #state{index = N, table = Table}}. + {ok, #state{index = N}}. %% @private --spec handle_call(any(), any(), state()) -> {reply, term(), state()}. -handle_call(get_all_children, _From, #state{table = Table} = State) -> - Children = ets:tab2list(Table), - {reply, Children, State}. +-spec handle_call(any(), any(), state()) -> {reply, ok, state()}. +handle_call(_Call, _From, State) -> + {reply, ok, State}. %% @private --spec handle_cast(Request, state()) -> {noreply, state()} when - Request :: {start_child, amoc:scenario(), amoc_scenario:user_id(), amoc_scenario:state()} | - {start_children, amoc:scenario(), [amoc_scenario:user_id()], amoc_scenario:state()} | - {stop_children, non_neg_integer(), boolean()} | - terminate_all_children. -handle_cast({start_child, Scenario, Id, ScenarioState}, State) -> - do_start_child(Scenario, Id, ScenarioState, State), - {noreply, State}; +-spec handle_cast(Request, state()) -> {noreply, state()} when Request :: + {start_children, amoc:scenario(), [amoc_scenario:user_id()], amoc_scenario:state()} | + {stop_children, [pid()], boolean()}. handle_cast({start_children, Scenario, Ids, ScenarioState}, State) -> - [ do_start_child(Scenario, Id, ScenarioState, State) || Id <- Ids], - {noreply, State}; -handle_cast({stop_children, 0, _}, State) -> - {noreply, State}; -handle_cast({stop_children, Int, ForceRemove}, #state{table = Table} = State) -> - Pids = case ets:match_object(Table, '$1', Int) of - '$end_of_table' -> - []; - {Objects, _} -> - [Pid || {Pid, _Id} <- Objects] - end, - NewState = maybe_track_task_to_stop_my_children(State, Pids, ForceRemove), + NewState = do_start_children(State, Scenario, Ids, ScenarioState), {noreply, NewState}; -handle_cast(terminate_all_children, State) -> - NewState = do_terminate_all_my_children(State), +handle_cast({stop_children, Pids, ForceRemove}, State) -> + NewState = do_stop_children(State, Pids, ForceRemove), {noreply, NewState}; -handle_cast(_Msg, State) -> +handle_cast(_Info, State) -> {noreply, State}. %% @private -spec handle_info(Request, state()) -> {noreply, state()} when - Request :: {child_up, pid(), amoc_scenario:user_id()} | - {'DOWN', reference(), process, pid(), term()} | - {'EXIT', pid(), term()}. -handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{tasks = Tasks} = State) -> + Request :: {user_up, pid(), amoc_scenario:user_id()} | + {'EXIT', pid(), term()} | + {_, reference(), process, pid(), term()}. +handle_info({user_up, Pid, Id}, State) -> + NewState = handle_up_user(State, Pid, Id), + {noreply, NewState}; +handle_info({user_down, _Ref, process, Pid, _Reason}, + #state{waiting = Waiting} = State) -> + NewState = handle_down_user(State, Pid), + {noreply, NewState#state{waiting = maps:remove(Pid, Waiting)}}; +handle_info({task_down, Ref, process, _Pid, _Reason}, #state{tasks = Tasks} = State) -> {noreply, State#state{tasks = maps:remove(Ref, Tasks)}}; -handle_info({'EXIT', Pid, _Reason}, #state{index = N, table = Table} = State) -> - handle_down_user(Table, Pid, N), - {noreply, State}; handle_info(_Info, State) -> {noreply, State}. %% @private --spec terminate(term(), state()) -> state(). -terminate(_Reason, State) -> - do_terminate_all_my_children(State). +-spec terminate(term(), state()) -> any(). +terminate(_Reason, #state{waiting = Waiting, tasks = Tasks}) -> + {TaskPid, TaskRef} = shutdown_and_kill_after_timeout(maps:keys(Waiting)), + maps:foreach(fun flush_task/2, Tasks#{TaskRef => TaskPid}). %% Helpers +-spec do_start_children(state(), amoc:scenario(), [amoc_scenario:user_id()], term()) -> state(). +do_start_children(#state{waiting = Waiting} = State, Scenario, Ids, ScenarioState) -> + Fun = fun(Id, Acc) -> + {Pid, Ref} = user_up(Scenario, Id, ScenarioState), + [{Pid, Ref} | Acc] + end, + NewWaiting = lists:foldl(Fun, [], Ids), + State#state{waiting = maps:merge(Waiting, maps:from_list(NewWaiting))}. + +-spec user_up(amoc:scenario(), amoc_scenario:user_id(), state()) -> {pid(), reference()}. +user_up(Scenario, Id, State) -> + Args = [self(), Scenario, Id, State], + Opts = [link, {monitor, [{tag, user_down}]}], + proc_lib:spawn_opt(amoc_user, init, Args, Opts). + +-spec handle_up_user(state(), pid(), amoc_scenario:user_id()) -> state(). +handle_up_user(#state{index = SupNum, waiting = Waiting} = State, Pid, Id) -> + amoc_users_sup:handle_up_user(SupNum, Pid, Id), + State#state{waiting = maps:remove(Pid, Waiting)}. + +-spec handle_down_user(state(), pid()) -> state(). +handle_down_user(#state{index = SupNum} = State, Pid) -> + amoc_users_sup:handle_down_user(SupNum, Pid), + State. --spec do_start_child(module(), amoc_scenario:user_id(), term(), state()) -> any(). -do_start_child(Scenario, Id, ScenarioState, #state{index = N, table = Table}) -> - case amoc_user:start_link(Scenario, Id, ScenarioState) of - {ok, Pid} -> - handle_up_user(Table, Pid, Id, N); - _ -> - ok - end. - --spec handle_up_user(ets:table(), pid(), amoc_scenario:user_id(), non_neg_integer()) -> any(). -handle_up_user(Table, Pid, Id, SupNum) -> - ets:insert(Table, {Pid, Id}), - amoc_users_sup:incr_no_of_users(SupNum). - --spec handle_down_user(ets:table(), pid(), non_neg_integer()) -> ok. -handle_down_user(Table, Pid, SupNum) -> - ets:delete(Table, Pid), - amoc_users_sup:decr_no_of_users(SupNum). +-spec flush_task(reference(), pid()) -> ok. +flush_task(Ref, Pid) -> + receive {task_down, Ref, process, Pid, _} -> ok after 0 -> ok end. %% @doc Stop a list of users in parallel. %% We don't want to ever block the supervisor on `timer:sleep/1' so we spawn that async. %% However we don't want free processes roaming around, we want monitoring that can be traced. --spec maybe_track_task_to_stop_my_children(state(), [pid()], boolean()) -> state(). -maybe_track_task_to_stop_my_children(State, [], _) -> - State; -maybe_track_task_to_stop_my_children(State, Pids, false) -> +-spec do_stop_children(state(), [pid()], boolean()) -> state(). +do_stop_children(State, Pids, false) -> [ exit(Pid, shutdown) || Pid <- Pids ], State; -maybe_track_task_to_stop_my_children(#state{tasks = Tasks} = State, Pids, true) -> - {Pid, Ref} = spawn_monitor(shutdown_and_kill_after_timeout_fun(Pids)), - State#state{tasks = Tasks#{Pid => Ref}}. +do_stop_children(#state{tasks = Tasks} = State, Pids, true) -> + {Pid, Ref} = shutdown_and_kill_after_timeout(Pids), + State#state{tasks = Tasks#{Ref => Pid}}. --spec shutdown_and_kill_after_timeout_fun(pid() | [pid()]) -> fun(() -> term()). -shutdown_and_kill_after_timeout_fun([_ | _] = Pids) -> +-spec shutdown_and_kill_after_timeout_fun([pid()]) -> fun(() -> term()). +shutdown_and_kill_after_timeout_fun(Pids) -> fun() -> [ exit(Pid, shutdown) || Pid <- Pids ], timer:sleep(?SHUTDOWN_TIMEOUT), [ exit(Pid, kill) || Pid <- Pids ] - end; -shutdown_and_kill_after_timeout_fun(Pid) when is_pid(Pid) -> - fun() -> - exit(Pid, shutdown), - timer:sleep(?SHUTDOWN_TIMEOUT), - exit(Pid, kill) end. --spec do_terminate_all_my_children(state()) -> state(). -do_terminate_all_my_children(#state{table = Table} = State) -> - Match = ets:match_object(Table, '$1', 200), - do_terminate_all_my_children(State, Match). - -%% ets:continuation/0 type is unfortunately not exported from the ets module. --spec do_terminate_all_my_children(state(), {[tuple()], term()} | '$end_of_table') -> state(). -do_terminate_all_my_children(State, {Objects, Continuation}) -> - Pids = [Pid || {Pid, _Id} <- Objects], - NewState = maybe_track_task_to_stop_my_children(State, Pids, true), - Match = ets:match_object(Continuation), - do_terminate_all_my_children(NewState, Match); -do_terminate_all_my_children(State, '$end_of_table') -> - State. +-spec shutdown_and_kill_after_timeout([pid()]) -> {pid(), reference()}. +shutdown_and_kill_after_timeout(Pids) -> + Fun = shutdown_and_kill_after_timeout_fun(Pids), + erlang:spawn_opt(Fun, [{monitor, [{tag, task_down}]}]). diff --git a/test/amoc_SUITE.erl b/test/amoc_SUITE.erl index dab640d1..dd103e34 100644 --- a/test/amoc_SUITE.erl +++ b/test/amoc_SUITE.erl @@ -1,15 +1,17 @@ -module(amoc_SUITE). --include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). -compile([export_all, nowarn_export_all]). all() -> [ + stop_kill_all_users, bad_config_fails_to_start, start_with_no_users, start_with_some_users, start_and_add_some_users, + start_and_then_force_remove_some_users_removes_first_ids, start_and_then_force_remove_some_users, start_and_then_soft_remove_some_users, start_and_then_force_remove_more_users_than_running, @@ -37,9 +39,18 @@ end_per_testcase(_, _Config) -> %% test cases %%----------------------------------------------------------------------------------- +stop_kill_all_users(_) -> + ?assertEqual(ok, amoc_do(testing_scenario, 5)), + test_helpers:wait_until_scenario_has_users(testing_scenario, 5, 5), + Users = amoc_users_sup:get_all_children(), + application:stop(amoc), + Pred = fun({Pid, _}) -> not erlang:is_process_alive(Pid) end, + WaitUntilFun = fun() -> lists:all(Pred, Users) end, + wait_helper:wait_until(WaitUntilFun, true). + bad_config_fails_to_start(_) -> Ret = amoc_do(testing_scenario, 0, []), - ?assertMatch({{error, _}, 0}, Ret). + ?assertMatch({error, _}, Ret). start_with_no_users(_) -> Ret = amoc_do(testing_scenario, 0), @@ -58,6 +69,14 @@ start_and_add_some_users(_) -> amoc:add(1), test_helpers:wait_until_scenario_has_users(testing_scenario, 1, 1). +start_and_then_force_remove_some_users_removes_first_ids(_) -> + Ret = amoc_do(testing_scenario, 50), + ?assertEqual(ok, Ret), + test_helpers:wait_until_scenario_has_users(testing_scenario, 50, 50), + Removed = amoc:remove(10, true), + ?assertEqual({ok, 10}, Removed), + test_helpers:wait_until_scenario_has_users(testing_scenario, 40, 50). + start_and_then_force_remove_some_users(_) -> Ret = amoc_do(testing_scenario, 2), ?assertEqual(ok, Ret), @@ -109,7 +128,7 @@ start_and_then_stop_cannot_rerun(_) -> Status = amoc:stop(), ?assertMatch(ok, Status), Retry = amoc_do(testing_scenario, 1), - ?assertMatch({{error, {invalid_status, _}}, 1}, Retry). + ?assertMatch({error, {invalid_status, _}}, Retry). after_reset_can_run_again(_) -> Ret = amoc_do(testing_scenario, 1), diff --git a/test/amoc_config_env_SUITE.erl b/test/amoc_config_env_SUITE.erl index 5eec559a..cdd16ba0 100644 --- a/test/amoc_config_env_SUITE.erl +++ b/test/amoc_config_env_SUITE.erl @@ -1,4 +1,5 @@ -module(amoc_config_env_SUITE). +-compile([export_all, nowarn_export_all]). -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -11,8 +12,6 @@ unset_os_env/1, set_empty_os_env/1]). --compile(export_all). - -define(MOCK_MOD, custom_parser). all() -> diff --git a/test/amoc_config_scenario_SUITE.erl b/test/amoc_config_scenario_SUITE.erl index 6ab04ba3..c31e63d9 100644 --- a/test/amoc_config_scenario_SUITE.erl +++ b/test/amoc_config_scenario_SUITE.erl @@ -94,7 +94,7 @@ end_per_testcase(_, Config) -> parse_scenario_settings(_) -> mock_ets_tables(), ets:insert(configurable_modules, {amoc_controller, configurable}), - ScenarioSettings = [{interarrival, 500}, + ScenarioSettings = [{user_rate, 500}, {config_scenario_var1, def1}], Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, ScenarioSettings), ?assertEqual(ok, Ret), @@ -114,7 +114,7 @@ parse_scenario_settings(_) -> %% overwritten variable ?assertEqual(val2, amoc_config:get(config_scenario_var2)), %% configurable module variable (defined in amoc_controller) - ?assertEqual(500, amoc_config:get(interarrival)). + ?assertEqual(500, amoc_config:get(user_rate)). update_settings(_) -> set_initial_configuration(), diff --git a/test/controller_SUITE.erl b/test/controller_SUITE.erl index 18a3b54c..fd0338ca 100644 --- a/test/controller_SUITE.erl +++ b/test/controller_SUITE.erl @@ -46,7 +46,8 @@ all_tests() -> stop_running_scenario_with_no_users_immediately_terminates, stop_running_scenario_with_users_stays_in_finished, stop_running_scenario_with_users_eventually_terminates, - interarrival_equal_zero_starts_all_users_at_once, + user_rate_equal_zero_starts_no_users, + user_rate_equal_infinity_starts_all_users_at_once, scenario_with_state_and_crashing_in_terminate_run_fine, scenario_missing_start_callback_fails, scenario_with_failing_init_fails @@ -62,6 +63,7 @@ end_per_suite(Config) -> init_per_testcase(_TestCase, Config) -> application:ensure_all_started(amoc), + amoc_cluster:set_master_node(node()), Config. end_per_testcase(_TestCase, Config) -> @@ -214,12 +216,25 @@ stop_running_scenario_with_users_eventually_terminates(_) -> WaitUntilValue = {finished, testing_scenario}, wait_helper:wait_until(WaitUntilFun, WaitUntilValue). -interarrival_equal_zero_starts_all_users_at_once(_) -> - Vars = [{interarrival, 0}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()], +user_rate_equal_zero_starts_no_users(_) -> + Vars = [{user_rate, 0}, {testing_var1, def1} + | test_helpers:other_vars_to_keep_quiet()], + do_start_scenario(testing_scenario, Vars), + amoc_controller:add_users(1, 1000), + ct:sleep(100), + Status = amoc_controller:get_status(), + ?assertMatch( + {running, #{scenario := testing_scenario, + currently_running_users := 0}}, + Status). + +user_rate_equal_infinity_starts_all_users_at_once(_) -> + Vars = [{user_rate, infinity}, {testing_var1, def1} + | test_helpers:other_vars_to_keep_quiet()], do_start_scenario(testing_scenario, Vars), NumOfUsers = 1000, amoc_controller:add_users(1, NumOfUsers), - Extra = #{time_left => 25, sleep_time => 5}, + Extra = #{time_left => 5, sleep_time => 1}, test_helpers:wait_until_scenario_has_users(testing_scenario, NumOfUsers, NumOfUsers, Extra). scenario_with_state_and_crashing_in_terminate_run_fine(_) -> diff --git a/test/test_helpers.erl b/test/test_helpers.erl index 47bc35f9..b7e02d0b 100644 --- a/test/test_helpers.erl +++ b/test/test_helpers.erl @@ -3,7 +3,7 @@ -compile([export_all, nowarn_export_all]). wait_until_scenario_has_users(Scenario, Current, HighestId) -> - wait_until_scenario_has_users(Scenario, Current, HighestId, #{}). + wait_until_scenario_has_users(Scenario, Current, HighestId, #{time_left => timer:seconds(1)}). wait_until_scenario_has_users(Scenario, Current, HighestId, ExtraConfig) -> WaitUntilFun = fun amoc_controller:get_status/0, @@ -13,18 +13,18 @@ wait_until_scenario_has_users(Scenario, Current, HighestId, ExtraConfig) -> wait_helper:wait_until(WaitUntilFun, WaitUntilValue, ExtraConfig). all_vars() -> - [{interarrival, 1}, {testing_var1, def1}, + [{user_rate, 60000}, {testing_var1, def1}, {config_scenario_var1, unused_value}]. regular_vars() -> - [{interarrival, 1}, {testing_var1, def1}]. + [{user_rate, 60000}, {testing_var1, def1}]. all_vars_with_state() -> - [{interarrival, 1}, {testing_state_var1, def1}, + [{user_rate, 60000}, {testing_state_var1, def1}, {config_scenario_var1, unused_value}]. regular_vars_with_state() -> - [{interarrival, 1}, {testing_state_var1, def1}]. + [{user_rate, 60000}, {testing_state_var1, def1}]. other_vars_to_keep_quiet() -> [{config_scenario_var1, unused_value}].