Skip to content

Commit 44d24d5

Browse files
committed
Apply review
1 parent acfdabc commit 44d24d5

File tree

5 files changed

+41
-23
lines changed

5 files changed

+41
-23
lines changed

src/amoc_controller.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,7 @@ disable() ->
130130
%% @private
131131
-spec verify_user_rate(any()) -> boolean().
132132
verify_user_rate(UserRate) ->
133-
(infinity =:= UserRate)
134-
orelse is_integer(UserRate)
135-
andalso (0 =< UserRate).
133+
(is_integer(UserRate) andalso (0 =< UserRate)) orelse (infinity =:= UserRate).
136134

137135
%% @private
138136
-spec update_user_rate(user_rate, user_rate()) -> ok.
@@ -255,7 +253,7 @@ handle_add(StartId, EndId, #state{status = running,
255253
when StartId =< EndId, LastId < StartId ->
256254
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
257255
#{scenario => Scenario, type => add}),
258-
amoc_users_sup:start_children(Scenario, {StartId, EndId}, ScenarioState),
256+
amoc_users_sup:start_children(Scenario, StartId, EndId, ScenarioState),
259257
{ok, State#state{last_user_id = EndId}};
260258
handle_add(_StartId, _EndId, #state{status = running} = State) ->
261259
{{error, invalid_range}, State};
@@ -325,7 +323,7 @@ get_user_rate() ->
325323

326324
-spec wait_user_rate() -> boolean().
327325
wait_user_rate() ->
328-
0 =:= get_user_rate()
326+
infinity =:= get_user_rate()
329327
orelse ok =:= amoc_throttle:wait(user_rate).
330328

331329
-spec start_user_rate() -> any().

src/throttle/amoc_throttle_controller.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
ensure_throttle_processes_started/2,
1212
pause/1, resume/1, stop/1, get_info/1,
1313
change_rate/2, change_rate_gradually/2,
14-
pg_scope/0, consume_all_timer_ticks/1,
14+
pg_scope/0,
1515
get_throttle_process/1,
1616
raise_event_on_slave_node/2, telemetry_event/2]).
1717

src/throttle/amoc_throttle_process.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,14 @@ maybe_stop_timer(#state{tref = undefined}) ->
143143
ok;
144144
maybe_stop_timer(#state{tref = TRef}) ->
145145
{ok, cancel} = timer:cancel(TRef),
146-
amoc_throttle_controller:consume_all_timer_ticks(delay_between_executions).
146+
consume_all_timer_ticks(delay_between_executions).
147+
148+
-spec consume_all_timer_ticks(any()) -> ok.
149+
consume_all_timer_ticks(Msg) ->
150+
receive
151+
Msg -> consume_all_timer_ticks(Msg)
152+
after 0 -> ok
153+
end.
147154

148155
timeout(#state{delay_between_executions = infinity}) ->
149156
infinity;

src/users/amoc_users_sup.erl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
%%
55
%% It spawns a pool of workers as big as online schedulers. When starting a new user, as the user is
66
%% identified by ID, a worker will be chosen for this user based on its ID
7-
%% (see `gen_sup_from_userid/1').
7+
%% (see `get_sup_from_user_id/1').
88
%%
99
%% The currently running number of users is stored in an atomic that all workers update and the
1010
%% controller can read.
@@ -17,7 +17,7 @@
1717

1818
%% API
1919
-export([handle_up_user/3, handle_down_user/2, count_no_of_users/0]).
20-
-export([start_children/3, stop_child/2, stop_children/2, terminate_all_children/0]).
20+
-export([start_children/4, stop_child/2, stop_children/2, terminate_all_children/0]).
2121
-export([distribute/2, get_all_children/0]).
2222

2323
-type count() :: non_neg_integer().
@@ -42,8 +42,7 @@
4242
start_link() ->
4343
Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, no_args),
4444
UserSups = supervisor:which_children(?MODULE),
45-
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups,
46-
is_integer(N), is_pid(Pid)],
45+
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups ],
4746
IndexedSups = lists:keysort(2, IndexedSupsUnsorted),
4847
UserSupPidsTuple = list_to_tuple([ Pid || {Pid, _} <- IndexedSups ]),
4948
SupCount = tuple_size(UserSupPidsTuple),
@@ -72,6 +71,7 @@ init(no_args) ->
7271
Strategy = #{strategy => one_for_one, intensity => 0},
7372
{ok, {Strategy, Specs}}.
7473

74+
%% We start from 2 to simplify user_count atomics management.
7575
indexes() ->
7676
lists:seq(2, erlang:system_info(schedulers_online) + 1).
7777

@@ -104,7 +104,7 @@ handle_down_user(SupNum, Pid) when SupNum > 1 ->
104104
stop_child(Pid, Force) ->
105105
case ets:lookup(?TABLE, Pid) of
106106
[Object] ->
107-
Sup = gen_sup_from_userid(Object),
107+
Sup = get_sup_from_user_id(Object),
108108
amoc_users_worker_sup:stop_children(Sup, [Pid], Force);
109109
_ ->
110110
ok
@@ -113,11 +113,11 @@ stop_child(Pid, Force) ->
113113
%% Group all children based on ID to their respective worker supervisor and cast a request with each
114114
%% group at once. This way we reduce the number of casts to each worker to always one, instead of
115115
%% depending on the number of users.
116-
-spec start_children(amoc:scenario(), {amoc_scenario:user_id(), amoc_scenario:user_id()}, any()) ->
116+
-spec start_children(amoc:scenario(), amoc_scenario:user_id(), amoc_scenario:user_id(), any()) ->
117117
ok.
118-
start_children(Scenario, {StartId, EndId}, ScenarioState) ->
118+
start_children(Scenario, StartId, EndId, ScenarioState) ->
119119
UserIds = lists:seq(StartId, EndId),
120-
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, UserIds),
120+
Assignments = maps:groups_from_list(fun get_sup_from_user_id/1, UserIds),
121121
CastFun = fun(Sup, Users) ->
122122
amoc_users_worker_sup:start_children(Sup, Scenario, Users, ScenarioState)
123123
end,
@@ -147,7 +147,7 @@ terminate_all_children() ->
147147

148148
-spec stop_children_assignments([{pid(), amoc_scenario:user_id()}], boolean()) -> ok.
149149
stop_children_assignments(Users, Force) ->
150-
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, fun get_pid/1, Users),
150+
Assignments = maps:groups_from_list(fun get_sup_from_user_id/1, fun get_pid/1, Users),
151151
CastFun = fun(Sup, Assignment) ->
152152
amoc_users_worker_sup:stop_children(Sup, Assignment, Force)
153153
end,
@@ -163,10 +163,10 @@ do_terminate_all_my_children('$end_of_table') ->
163163
ok.
164164

165165
%% Helpers
166-
-spec gen_sup_from_userid({pid(), amoc_scenario:user_id()} | amoc_scenario:user_id()) -> pid().
167-
gen_sup_from_userid({_Pid, Id}) ->
168-
gen_sup_from_userid(Id);
169-
gen_sup_from_userid(Id) ->
166+
-spec get_sup_from_user_id({pid(), amoc_scenario:user_id()} | amoc_scenario:user_id()) -> pid().
167+
get_sup_from_user_id({_Pid, Id}) ->
168+
get_sup_from_user_id(Id);
169+
get_sup_from_user_id(Id) ->
170170
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
171171
Index = erlang:phash2(Id, SupCount) + 1,
172172
element(Index, Supervisors).

test/controller_SUITE.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ all_tests() ->
4646
stop_running_scenario_with_no_users_immediately_terminates,
4747
stop_running_scenario_with_users_stays_in_finished,
4848
stop_running_scenario_with_users_eventually_terminates,
49-
user_rate_equal_zero_starts_all_users_at_once,
49+
user_rate_equal_zero_starts_no_users,
50+
user_rate_equal_infinity_starts_all_users_at_once,
5051
scenario_with_state_and_crashing_in_terminate_run_fine,
5152
scenario_missing_start_callback_fails,
5253
scenario_with_failing_init_fails
@@ -215,13 +216,25 @@ stop_running_scenario_with_users_eventually_terminates(_) ->
215216
WaitUntilValue = {finished, testing_scenario},
216217
wait_helper:wait_until(WaitUntilFun, WaitUntilValue).
217218

218-
user_rate_equal_zero_starts_all_users_at_once(_) ->
219+
user_rate_equal_zero_starts_no_users(_) ->
219220
Vars = [{user_rate, 0}, {testing_var1, def1}
220221
| test_helpers:other_vars_to_keep_quiet()],
221222
do_start_scenario(testing_scenario, Vars),
223+
amoc_controller:add_users(1, 1000),
224+
ct:sleep(100),
225+
Status = amoc_controller:get_status(),
226+
?assertMatch(
227+
{running, #{scenario := testing_scenario,
228+
currently_running_users := 0}},
229+
Status).
230+
231+
user_rate_equal_infinity_starts_all_users_at_once(_) ->
232+
Vars = [{user_rate, infinity}, {testing_var1, def1}
233+
| test_helpers:other_vars_to_keep_quiet()],
234+
do_start_scenario(testing_scenario, Vars),
222235
NumOfUsers = 1000,
223236
amoc_controller:add_users(1, NumOfUsers),
224-
Extra = #{time_left => 25, sleep_time => 5},
237+
Extra = #{time_left => 5, sleep_time => 1},
225238
test_helpers:wait_until_scenario_has_users(testing_scenario, NumOfUsers, NumOfUsers, Extra).
226239

227240
scenario_with_state_and_crashing_in_terminate_run_fine(_) ->

0 commit comments

Comments
 (0)