Skip to content

Commit 96be6ee

Browse files
authored
Merge pull request #186 from esl/interarrival_as_throttle
Interarrival as throttle
2 parents a7aff4f + 44d24d5 commit 96be6ee

14 files changed

+256
-285
lines changed

guides/configuration.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ Amoc supports the following generic configuration parameters:
99
* default value - empty list (`[]`)
1010
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`
1111

12-
13-
* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
14-
for two consecutive users:
15-
* default value - 50 ms.
16-
* example: `AMOC_INTERARRIVAL="50"`
12+
* `user_rate` - a rate (implemented as a `t:amoc_throttle:rate/0`) of user processes per minute:
13+
* default value - 1200.
14+
* example: `AMOC_USER_RATE="600"`
1715
* this parameter can be updated at runtime (in the same way as scenario configuration).
1816

1917
* `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface

src/amoc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ do(Scenario, Count, Settings) ->
2525
case {amoc_controller:start_scenario(Scenario, Settings), Count} of
2626
{ok, 0} -> ok;
2727
{ok, Count} -> amoc_controller:add_users(1, Count);
28-
Error -> Error
28+
{Error, _} -> Error
2929
end;
3030
Error -> Error
3131
end.

src/amoc_controller.erl

Lines changed: 58 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,46 @@
88
-behaviour(gen_server).
99

1010
-define(SERVER, ?MODULE).
11+
-define(DEFAULT_USER_RATE, 1200).
1112

12-
-required_variable(#{name => interarrival, default_value => 50,
13-
verification => {?MODULE, non_neg_integer, 1},
14-
description => "a delay between creating the processes for two "
15-
"consecutive users (ms, def: 50ms)",
16-
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
13+
-required_variable(#{name => user_rate, default_value => ?DEFAULT_USER_RATE,
14+
verification => {?MODULE, verify_user_rate, 1},
15+
description => "Throttle rate for the Scenario:start/1,2 callback",
16+
update => {?MODULE, update_user_rate, 2}}).
1717

1818
-record(state, {scenario :: amoc:scenario() | undefined,
19+
status = idle :: status(),
1920
last_user_id = 0 :: last_user_id(),
20-
status = idle :: idle | running | terminating | finished |
21-
{error, any()} | disabled,
22-
scenario_state :: any(), %% state returned from Scenario:init/0
23-
create_users = [] :: [amoc_scenario:user_id()],
24-
tref :: timer:tref() | undefined}).
21+
scenario_state :: amoc_scenario:state() %% state returned from Scenario:init/0
22+
}).
23+
24+
-type status() :: idle | running | terminating | finished | {error, any()} | disabled.
25+
%% Scenario status.
2526

2627
-type state() :: #state{}.
2728
%% Internal state of the node's controller
29+
2830
-type handle_call_res() :: ok | {ok, term()} | {error, term()}.
2931
-type running_status() :: #{scenario := amoc:scenario(),
3032
currently_running_users := user_count(),
3133
highest_user_id := last_user_id()}.
3234
%% Details about the scenario currently running
35+
3336
-type amoc_status() :: idle |
3437
{running, running_status()} |
3538
{terminating, amoc:scenario()} |
3639
{finished, amoc:scenario()} |
3740
{error, any()} |
3841
disabled.
3942
%% Status of the node, note that amoc_controller is disabled for the master node
43+
4044
-type user_count() :: non_neg_integer().
4145
%% Number of users currently running in the node
46+
4247
-type last_user_id() :: non_neg_integer().
4348
%% Highest user id registered in the node
44-
-type interarrival() :: non_neg_integer().
49+
50+
-type user_rate() :: amoc_throttle:rate().
4551
%% Time to wait in between spawning new users
4652

4753
%% ------------------------------------------------------------------
@@ -65,14 +71,14 @@
6571
%% ------------------------------------------------------------------
6672
%% Parameters verification functions
6773
%% ------------------------------------------------------------------
68-
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
74+
-export([update_user_rate/2, verify_user_rate/1]).
6975

70-
-export([zero_users_running/0]).
76+
-export([wait_user_rate/0, zero_users_running/0]).
7177

7278
%% ------------------------------------------------------------------
7379
%% gen_server Function Exports
7480
%% ------------------------------------------------------------------
75-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
81+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
7682

7783
%% ------------------------------------------------------------------
7884
%% API Function Definitions
@@ -122,14 +128,14 @@ disable() ->
122128
gen_server:call(?SERVER, disable).
123129

124130
%% @private
125-
-spec non_neg_integer(any()) -> boolean().
126-
non_neg_integer(Interarrival) ->
127-
is_integer(Interarrival) andalso Interarrival >= 0.
131+
-spec verify_user_rate(any()) -> boolean().
132+
verify_user_rate(UserRate) ->
133+
(is_integer(UserRate) andalso (0 =< UserRate)) orelse (infinity =:= UserRate).
128134

129135
%% @private
130-
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
131-
maybe_update_interarrival_timer(interarrival, _) ->
132-
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
136+
-spec update_user_rate(user_rate, user_rate()) -> ok.
137+
update_user_rate(user_rate, UserRate) ->
138+
ok = amoc_throttle:change_rate(user_rate, #{rate => UserRate}).
133139

134140
%% @private
135141
-spec zero_users_running() -> ok.
@@ -180,8 +186,6 @@ handle_call(_Request, _From, State) ->
180186

181187
%% @private
182188
-spec handle_cast(any(), state()) -> {noreply, state()}.
183-
handle_cast(maybe_update_interarrival_timer, State) ->
184-
{noreply, maybe_update_interarrival_timer(State)};
185189
handle_cast(zero_users_running, State) ->
186190
NewSate = handle_zero_users_running(State),
187191
{noreply, NewSate};
@@ -190,15 +194,14 @@ handle_cast(_Msg, State) ->
190194

191195
%% @private
192196
-spec handle_info(any(), state()) -> {noreply, state()}.
193-
handle_info(start_user, State) ->
194-
NewSate = handle_start_user(State),
195-
{noreply, NewSate};
196-
handle_info(start_all_users, State) ->
197-
NewSate = handle_start_all_users(State),
198-
{noreply, NewSate};
199197
handle_info(_Msg, State) ->
200198
{noreply, State}.
201199

200+
%% @private
201+
-spec terminate(term(), state()) -> any().
202+
terminate(_Reason, _State) ->
203+
amoc_users_sup:terminate_all_children().
204+
202205
%% ------------------------------------------------------------------
203206
%% internal functions
204207
%% ------------------------------------------------------------------
@@ -243,18 +246,15 @@ handle_update_settings(_Settings, #state{status = Status}) ->
243246

244247
-spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) ->
245248
{handle_call_res(), state()}.
246-
handle_add(StartId, EndId, #state{last_user_id = LastId,
247-
create_users = ScheduledUsers,
248-
status = running,
249+
handle_add(StartId, EndId, #state{status = running,
250+
last_user_id = LastId,
249251
scenario = Scenario,
250-
tref = TRef} = State) when StartId =< EndId,
251-
LastId < StartId ->
252+
scenario_state = ScenarioState} = State)
253+
when StartId =< EndId, LastId < StartId ->
252254
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
253255
#{scenario => Scenario, type => add}),
254-
NewUsers = lists:seq(StartId, EndId),
255-
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
256-
NewTRef = maybe_start_timer(TRef),
257-
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
256+
amoc_users_sup:start_children(Scenario, StartId, EndId, ScenarioState),
257+
{ok, State#state{last_user_id = EndId}};
258258
handle_add(_StartId, _EndId, #state{status = running} = State) ->
259259
{{error, invalid_range}, State};
260260
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
@@ -287,23 +287,6 @@ handle_disable(#state{status = idle} = State) ->
287287
handle_disable(#state{status = Status} = State) ->
288288
{{error, {invalid_status, Status}}, State}.
289289

290-
-spec handle_start_user(state()) -> state().
291-
handle_start_user(#state{create_users = [UserId | T],
292-
scenario = Scenario,
293-
scenario_state = ScenarioState} = State) ->
294-
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
295-
State#state{create_users = T};
296-
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
297-
State#state{tref = maybe_stop_timer(TRef)}.
298-
299-
-spec handle_start_all_users(state()) -> state().
300-
handle_start_all_users(#state{create_users = AllUsers,
301-
scenario = Scenario,
302-
scenario_state = ScenarioState,
303-
tref = TRef} = State) ->
304-
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
305-
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.
306-
307290
%% ------------------------------------------------------------------
308291
%% helpers
309292
%% ------------------------------------------------------------------
@@ -316,12 +299,15 @@ start_tables() -> %% ETS creation
316299
{ok | error, any()}.
317300
init_scenario(Scenario, Settings) ->
318301
case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of
319-
ok -> amoc_scenario:init(Scenario);
302+
ok ->
303+
start_user_rate(),
304+
amoc_scenario:init(Scenario);
320305
{error, Type, Reason} -> {error, {Type, Reason}}
321306
end.
322307

323308
-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
324309
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
310+
stop_user_rate(),
325311
amoc_scenario:terminate(Scenario, ScenarioState).
326312

327313
-spec handle_zero_users_running(state()) -> state().
@@ -331,35 +317,20 @@ handle_zero_users_running(#state{status = terminating} = State) ->
331317
handle_zero_users_running(State) ->
332318
State.
333319

334-
-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
335-
maybe_stop_timer(undefined) ->
336-
undefined;
337-
maybe_stop_timer(TRef) ->
338-
{ok, cancel} = timer:cancel(TRef),
339-
undefined.
340-
341-
-spec get_interarrival() -> interarrival().
342-
get_interarrival() ->
343-
amoc_config:get(interarrival).
344-
345-
-spec maybe_update_interarrival_timer(state()) -> state().
346-
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
347-
State;
348-
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
349-
{ok, cancel} = timer:cancel(TRef),
350-
Value = get_interarrival(),
351-
NewTRef = do_interarrival(Value),
352-
State#state{tref = NewTRef}.
353-
354-
-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
355-
maybe_start_timer(undefined) ->
356-
Value = get_interarrival(),
357-
do_interarrival(Value);
358-
maybe_start_timer(TRef) -> TRef.
359-
360-
do_interarrival(0) ->
361-
self() ! start_all_users,
362-
undefined;
363-
do_interarrival(Value) ->
364-
{ok, NewTRef} = timer:send_interval(Value, start_user),
365-
NewTRef.
320+
-spec get_user_rate() -> user_rate().
321+
get_user_rate() ->
322+
amoc_config:get(user_rate, ?DEFAULT_USER_RATE).
323+
324+
-spec wait_user_rate() -> boolean().
325+
wait_user_rate() ->
326+
infinity =:= get_user_rate()
327+
orelse ok =:= amoc_throttle:wait(user_rate).
328+
329+
-spec start_user_rate() -> any().
330+
start_user_rate() ->
331+
UserRate = get_user_rate(),
332+
amoc_throttle:start(user_rate, #{rate => UserRate}).
333+
334+
-spec stop_user_rate() -> any().
335+
stop_user_rate() ->
336+
amoc_throttle:stop(user_rate).

src/throttle/amoc_throttle.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
plan := plan()}.
6464
%% Gradual plan details. Must specify a `t:gradual/0', and a `t:plan/0'.
6565

66-
-export_type([t/0, name/0, rate/0, interval/0, gradual_plan/0]).
66+
-export_type([t/0, name/0, rate/0, interval/0, interarrival/0, gradual_plan/0]).
6767

6868
%% @doc Starts the throttle mechanism for a given `Name' with a given config.
6969
%%

src/throttle/amoc_throttle_controller.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate | Rates]} = Pla
267267
NewPlan = Plan#change_rate_plan{rates = Rates},
268268
State#{Name => Info1#throttle_info{change_plan = NewPlan}}.
269269

270+
-spec consume_all_timer_ticks(any()) -> ok.
270271
consume_all_timer_ticks(Msg) ->
271272
receive
272273
Msg -> consume_all_timer_ticks(Msg)

src/throttle/amoc_throttle_process.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,18 @@ maybe_stop_timer(#state{tref = TRef}) ->
145145
{ok, cancel} = timer:cancel(TRef),
146146
consume_all_timer_ticks(delay_between_executions).
147147

148-
timeout(#state{delay_between_executions = infinity}) ->
149-
infinity;
150-
timeout(#state{delay_between_executions = Delay}) ->
151-
Delay + ?DEFAULT_MSG_TIMEOUT.
152-
148+
-spec consume_all_timer_ticks(any()) -> ok.
153149
consume_all_timer_ticks(Msg) ->
154150
receive
155151
Msg -> consume_all_timer_ticks(Msg)
156152
after 0 -> ok
157153
end.
158154

155+
timeout(#state{delay_between_executions = infinity}) ->
156+
infinity;
157+
timeout(#state{delay_between_executions = Delay}) ->
158+
Delay + ?DEFAULT_MSG_TIMEOUT.
159+
159160
maybe_run_fn(#state{schedule = [], schedule_reversed = []} = State) ->
160161
State;
161162
maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->

src/users/amoc_user.erl

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,11 @@
33
-module(amoc_user).
44

55
%% API
6-
-export([start_link/3]).
76
-export([stop/0, stop/2]).
87
-export([init/4]).
98

109
-type state() :: term().
1110

12-
-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
13-
{ok, pid()} | {error, term()}.
14-
start_link(Scenario, Id, State) ->
15-
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).
16-
1711
-spec stop() -> ok.
1812
stop() ->
1913
stop(self(), false).
@@ -24,6 +18,7 @@ stop(Pid, Force) when is_pid(Pid) ->
2418

2519
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
2620
init(Parent, Scenario, Id, State) ->
27-
proc_lib:init_ack(Parent, {ok, self()}),
21+
amoc_controller:wait_user_rate(),
22+
amoc_users_worker_sup:user_up(Parent, Id),
2823
process_flag(trap_exit, true),
2924
amoc_scenario:start(Scenario, Id, State).

0 commit comments

Comments
 (0)