Skip to content

Commit 513b3eb

Browse files
committed
Convert interarrival into throttle mechanism
Have the controller simply requesting the user supervisors to start all the users at once, and then have each user process await for throttle permission.
1 parent 8e60d9c commit 513b3eb

File tree

6 files changed

+63
-92
lines changed

6 files changed

+63
-92
lines changed

guides/configuration.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ Amoc supports the following generic configuration parameters:
99
* default value - empty list (`[]`)
1010
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`
1111

12-
13-
* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
14-
for two consecutive users:
15-
* default value - 50 ms.
16-
* example: `AMOC_INTERARRIVAL="50"`
12+
* `interarrival` - a throttle rate (in units per millisecond) between creating the processes
13+
for two consecutive users, or 0 for no throttling:
14+
* default value - {1200, 60000}, i.e., a new user every 50ms.
15+
* example: `AMOC_INTERARRIVAL="{1200, 60000}"`
1716
* this parameter can be updated at runtime (in the same way as scenario configuration).
1817

1918
* `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface

src/amoc_controller.erl

Lines changed: 49 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,19 @@
88
-behaviour(gen_server).
99

1010
-define(SERVER, ?MODULE).
11+
-define(DEFAULT_INTERARRIVAL, 1200).
1112

12-
-required_variable(#{name => interarrival, default_value => 50,
13-
verification => {?MODULE, non_neg_integer, 1},
14-
description => "a delay between creating the processes for two "
15-
"consecutive users (ms, def: 50ms)",
16-
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
13+
-required_variable(#{name => interarrival, default_value => ?DEFAULT_INTERARRIVAL,
14+
verification => {?MODULE, verify_interarrival, 1},
15+
description => "Throttle rate for the Scenario:start/1,2 callback (def: 50ms)",
16+
update => {?MODULE, update_interarrival_rate, 2}}).
1717

1818
-record(state, {scenario :: amoc:scenario() | undefined,
1919
last_user_id = 0 :: last_user_id(),
2020
status = idle :: idle | running | terminating | finished |
2121
{error, any()} | disabled,
22-
scenario_state :: any(), %% state returned from Scenario:init/0
23-
create_users = [] :: [amoc_scenario:user_id()],
24-
tref :: timer:tref() | undefined}).
22+
scenario_state :: amoc_scenario:state() %% state returned from Scenario:init/0
23+
}).
2524

2625
-type state() :: #state{}.
2726
%% Internal state of the node's controller
@@ -41,7 +40,7 @@
4140
%% Number of users currently running in the node
4241
-type last_user_id() :: non_neg_integer().
4342
%% Highest user id registered in the node
44-
-type interarrival() :: non_neg_integer().
43+
-type interarrival() :: amoc_throttle:rate() | amoc_throttle:throttle().
4544
%% Time to wait in between spawning new users
4645

4746
%% ------------------------------------------------------------------
@@ -65,9 +64,9 @@
6564
%% ------------------------------------------------------------------
6665
%% Parameters verification functions
6766
%% ------------------------------------------------------------------
68-
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
67+
-export([update_interarrival_rate/2, verify_interarrival/1]).
6968

70-
-export([zero_users_running/0]).
69+
-export([get_interarrival/0, zero_users_running/0]).
7170

7271
%% ------------------------------------------------------------------
7372
%% gen_server Function Exports
@@ -122,14 +121,22 @@ disable() ->
122121
gen_server:call(?SERVER, disable).
123122

124123
%% @private
125-
-spec non_neg_integer(any()) -> boolean().
126-
non_neg_integer(Interarrival) ->
127-
is_integer(Interarrival) andalso Interarrival >= 0.
124+
-spec verify_interarrival(any()) -> boolean().
125+
verify_interarrival(infinity) ->
126+
true;
127+
verify_interarrival(Rate)
128+
when is_integer(Rate), Rate >= 0 ->
129+
true;
130+
verify_interarrival(#{rate := Rate, interval := Interval})
131+
when is_integer(Rate), Rate >= 0, is_integer(Interval), Interval > 0 ->
132+
true;
133+
verify_interarrival(_) ->
134+
false.
128135

129136
%% @private
130-
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
131-
maybe_update_interarrival_timer(interarrival, _) ->
132-
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
137+
-spec update_interarrival_rate(interarrival, amoc_throttle:throttle()) -> ok.
138+
update_interarrival_rate(interarrival, #{rate := Rate, interval := Interval}) ->
139+
ok = amoc_throttle:change_rate(interarrival, Rate, Interval).
133140

134141
%% @private
135142
-spec zero_users_running() -> ok.
@@ -180,8 +187,6 @@ handle_call(_Request, _From, State) ->
180187

181188
%% @private
182189
-spec handle_cast(any(), state()) -> {noreply, state()}.
183-
handle_cast(maybe_update_interarrival_timer, State) ->
184-
{noreply, maybe_update_interarrival_timer(State)};
185190
handle_cast(zero_users_running, State) ->
186191
NewSate = handle_zero_users_running(State),
187192
{noreply, NewSate};
@@ -190,12 +195,6 @@ handle_cast(_Msg, State) ->
190195

191196
%% @private
192197
-spec handle_info(any(), state()) -> {noreply, state()}.
193-
handle_info(start_user, State) ->
194-
NewSate = handle_start_user(State),
195-
{noreply, NewSate};
196-
handle_info(start_all_users, State) ->
197-
NewSate = handle_start_all_users(State),
198-
{noreply, NewSate};
199198
handle_info(_Msg, State) ->
200199
{noreply, State}.
201200

@@ -244,17 +243,14 @@ handle_update_settings(_Settings, #state{status = Status}) ->
244243
-spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) ->
245244
{handle_call_res(), state()}.
246245
handle_add(StartId, EndId, #state{last_user_id = LastId,
247-
create_users = ScheduledUsers,
248246
status = running,
249247
scenario = Scenario,
250-
tref = TRef} = State) when StartId =< EndId,
248+
scenario_state = ScenarioState} = State) when StartId =< EndId,
251249
LastId < StartId ->
252250
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
253251
#{scenario => Scenario, type => add}),
254-
NewUsers = lists:seq(StartId, EndId),
255-
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
256-
NewTRef = maybe_start_timer(TRef),
257-
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
252+
amoc_users_sup:start_children(Scenario, lists:seq(StartId, EndId), ScenarioState),
253+
{ok, State#state{last_user_id = EndId}};
258254
handle_add(_StartId, _EndId, #state{status = running} = State) ->
259255
{{error, invalid_range}, State};
260256
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
@@ -287,23 +283,6 @@ handle_disable(#state{status = idle} = State) ->
287283
handle_disable(#state{status = Status} = State) ->
288284
{{error, {invalid_status, Status}}, State}.
289285

290-
-spec handle_start_user(state()) -> state().
291-
handle_start_user(#state{create_users = [UserId | T],
292-
scenario = Scenario,
293-
scenario_state = ScenarioState} = State) ->
294-
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
295-
State#state{create_users = T};
296-
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
297-
State#state{tref = maybe_stop_timer(TRef)}.
298-
299-
-spec handle_start_all_users(state()) -> state().
300-
handle_start_all_users(#state{create_users = AllUsers,
301-
scenario = Scenario,
302-
scenario_state = ScenarioState,
303-
tref = TRef} = State) ->
304-
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
305-
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.
306-
307286
%% ------------------------------------------------------------------
308287
%% helpers
309288
%% ------------------------------------------------------------------
@@ -316,12 +295,15 @@ start_tables() -> %% ETS creation
316295
{ok | error, any()}.
317296
init_scenario(Scenario, Settings) ->
318297
case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of
319-
ok -> amoc_scenario:init(Scenario);
298+
ok ->
299+
start_interarrival(),
300+
amoc_scenario:init(Scenario);
320301
{error, Type, Reason} -> {error, {Type, Reason}}
321302
end.
322303

323304
-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
324305
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
306+
stop_interarrival(),
325307
amoc_scenario:terminate(Scenario, ScenarioState).
326308

327309
-spec handle_zero_users_running(state()) -> state().
@@ -331,35 +313,22 @@ handle_zero_users_running(#state{status = terminating} = State) ->
331313
handle_zero_users_running(State) ->
332314
State.
333315

334-
-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
335-
maybe_stop_timer(undefined) ->
336-
undefined;
337-
maybe_stop_timer(TRef) ->
338-
{ok, cancel} = timer:cancel(TRef),
339-
undefined.
340-
341-
-spec get_interarrival() -> interarrival().
316+
-spec get_interarrival() -> infinity | interarrival().
342317
get_interarrival() ->
343-
amoc_config:get(interarrival).
344-
345-
-spec maybe_update_interarrival_timer(state()) -> state().
346-
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
347-
State;
348-
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
349-
{ok, cancel} = timer:cancel(TRef),
350-
Value = get_interarrival(),
351-
NewTRef = do_interarrival(Value),
352-
State#state{tref = NewTRef}.
353-
354-
-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
355-
maybe_start_timer(undefined) ->
356-
Value = get_interarrival(),
357-
do_interarrival(Value);
358-
maybe_start_timer(TRef) -> TRef.
359-
360-
do_interarrival(0) ->
361-
self() ! start_all_users,
362-
undefined;
363-
do_interarrival(Value) ->
364-
{ok, NewTRef} = timer:send_interval(Value, start_user),
365-
NewTRef.
318+
amoc_config:get(interarrival, ?DEFAULT_INTERARRIVAL).
319+
320+
-spec start_interarrival() -> any().
321+
start_interarrival() ->
322+
case get_interarrival() of
323+
infinity ->
324+
amoc_throttle:start(interarrival, 1);
325+
#{rate := Rate, interval := Interval} ->
326+
Config = #{rate => Rate, interval => Interval},
327+
amoc_throttle:start(interarrival, Config);
328+
Rate ->
329+
amoc_throttle:start(interarrival, Rate)
330+
end.
331+
332+
-spec stop_interarrival() -> any().
333+
stop_interarrival() ->
334+
amoc_throttle:stop(interarrival).

src/amoc_scenario.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ start(Scenario, Id, State) ->
9191
{false, false} ->
9292
exit("the scenario module must export either start/2 or start/1 function")
9393
end,
94+
infinity =:= amoc_controller:get_interarrival() orelse amoc_throttle:wait(interarrival),
9495
telemetry:span([amoc, scenario, start], Metadata, Span).
9596

9697
%% ------------------------------------------------------------------

test/amoc_config_scenario_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ end_per_testcase(_, Config) ->
9494
parse_scenario_settings(_) ->
9595
mock_ets_tables(),
9696
ets:insert(configurable_modules, {amoc_controller, configurable}),
97-
ScenarioSettings = [{interarrival, 500},
97+
ScenarioSettings = [{interarrival, #{rate => 12000, interval => 60000}},
9898
{config_scenario_var1, def1}],
9999
Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, ScenarioSettings),
100100
?assertEqual(ok, Ret),
@@ -114,7 +114,7 @@ parse_scenario_settings(_) ->
114114
%% overwritten variable
115115
?assertEqual(val2, amoc_config:get(config_scenario_var2)),
116116
%% configurable module variable (defined in amoc_controller)
117-
?assertEqual(500, amoc_config:get(interarrival)).
117+
?assertEqual(#{rate => 12000, interval => 60000}, amoc_config:get(interarrival)).
118118

119119
update_settings(_) ->
120120
set_initial_configuration(),

test/controller_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ end_per_suite(Config) ->
6262

6363
init_per_testcase(_TestCase, Config) ->
6464
application:ensure_all_started(amoc),
65+
amoc_cluster:set_master_node(node()),
6566
Config.
6667

6768
end_per_testcase(_TestCase, Config) ->
@@ -215,7 +216,8 @@ stop_running_scenario_with_users_eventually_terminates(_) ->
215216
wait_helper:wait_until(WaitUntilFun, WaitUntilValue).
216217

217218
interarrival_equal_zero_starts_all_users_at_once(_) ->
218-
Vars = [{interarrival, 0}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()],
219+
Vars = [{interarrival, infinity}, {testing_var1, def1}
220+
| test_helpers:other_vars_to_keep_quiet()],
219221
do_start_scenario(testing_scenario, Vars),
220222
NumOfUsers = 1000,
221223
amoc_controller:add_users(1, NumOfUsers),

test/test_helpers.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@ wait_until_scenario_has_users(Scenario, Current, HighestId, ExtraConfig) ->
1313
wait_helper:wait_until(WaitUntilFun, WaitUntilValue, ExtraConfig).
1414

1515
all_vars() ->
16-
[{interarrival, 1}, {testing_var1, def1},
16+
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1},
1717
{config_scenario_var1, unused_value}].
1818

1919
regular_vars() ->
20-
[{interarrival, 1}, {testing_var1, def1}].
20+
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1}].
2121

2222
all_vars_with_state() ->
23-
[{interarrival, 1}, {testing_state_var1, def1},
23+
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1},
2424
{config_scenario_var1, unused_value}].
2525

2626
regular_vars_with_state() ->
27-
[{interarrival, 1}, {testing_state_var1, def1}].
27+
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1}].
2828

2929
other_vars_to_keep_quiet() ->
3030
[{config_scenario_var1, unused_value}].

0 commit comments

Comments
 (0)