Skip to content

Commit c3b18d5

Browse files
committed
Upgrade all throttle calls to use only explicit config maps
1 parent df4b9e4 commit c3b18d5

File tree

4 files changed

+141
-120
lines changed

4 files changed

+141
-120
lines changed

src/throttle/amoc_throttle.erl

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,44 @@
33
-module(amoc_throttle).
44

55
%% API
6-
-export([start/2, start/3, start/4, stop/1,
7-
send/2, send/3, send_and_wait/2, wait/1,
6+
-export([start/2, stop/1,
7+
send/2, send/3, wait/1,
88
run/2, pause/1, resume/1,
99
change_rate/2, change_rate/3,
1010
change_rate_gradually/2, change_rate_gradually/6]).
1111

12-
-deprecated([
13-
{start, 3, "use start/2 with a config"},
14-
{start, 4, "use start/2 with a config"},
15-
{send_and_wait, 2, "use wait/1 instead"}
16-
]).
17-
1812
-type name() :: atom().
19-
-type rate() :: pos_integer().
13+
%% Atom representing the name of the throttle.
14+
-type rate() :: infinity | non_neg_integer().
15+
%% Number of events per given `t:interval/0', or infinity for effectively unlocking all throttling.
16+
%% Note that a rate of zero means effectively pausing the throttle.
17+
-type interarrival() :: infinity | non_neg_integer().
18+
%% Time in milliseconds between two events, or infinity for effectively pausing the throttle. Note
19+
%% that an interarrival of zero means effectively unlocking all throttling.
2020
-type interval() :: non_neg_integer().
21-
%% In milliseconds, defaults to 60000 (one minute) when not given.
22-
%% An interval of 0 means no delay at all, only the number of simultaneous executions will be
23-
%% controlled, which corresponds to the number of processes started
24-
-type throttle() :: #{rate := rate(),
25-
interval := interval()}.
26-
-type interarrival() :: #{interarrival := non_neg_integer()}.
21+
%% In milliseconds, defaults to 60000 (one minute).
22+
-type throttle() :: #{rate := rate(), interval := interval()} |
23+
#{interarrival := interarrival()}.
2724
%% Throttle unit of measurement
2825
-type config() :: #{rate := rate(),
2926
interval => interval(),
3027
parallelism => non_neg_integer()}
3128
| #{interarrival := non_neg_integer(),
3229
parallelism => non_neg_integer()}.
33-
%% Literal throttle configuration. It can state `interarrival', in milliseconds,
34-
%% in which case the rate per interval is calculated to allow one event every given milliseconds,
35-
%% or, literally give the rate per interval.
30+
%% Literal throttle configuration.
3631

37-
-type gradual_rate_config() :: #{from_rate := rate(),
38-
to_rate := rate(),
32+
-type gradual_rate_config() :: #{from_rate := non_neg_integer(),
33+
to_rate := non_neg_integer(),
3934
interval => interval(),
4035
step_interval => pos_integer(),
4136
step_size => pos_integer(),
4237
step_count => pos_integer(),
38+
duration => pos_integer()} |
39+
#{from_interarrival := interarrival(),
40+
to_interarrival := interarrival(),
41+
step_interval => pos_integer(),
42+
step_size => pos_integer(),
43+
step_count => pos_integer(),
4344
duration => pos_integer()}.
4445
%% Configuration for a gradual throttle rate change
4546
%%
@@ -50,31 +51,17 @@
5051

5152
-export_type([name/0, rate/0, interval/0, throttle/0, config/0, gradual_rate_config/0]).
5253

53-
%% @see start/4
54+
%% @doc Starts the throttle mechanism for a given `Name' with a given config.
55+
%%
56+
%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10).
57+
%% `Name' is needed to identify the rate as a single test can have different rates for different tasks.
58+
%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity.
5459
-spec start(name(), config() | rate()) -> {ok, started | already_started} | {error, any()}.
5560
start(Name, #{} = Config) ->
5661
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config);
5762
start(Name, Rate) ->
5863
amoc_throttle_controller:ensure_throttle_processes_started(Name, #{rate => Rate}).
5964

60-
%% @see start/4
61-
-spec start(name(), rate(), non_neg_integer()) -> {ok, started | already_started} | {error, any()}.
62-
start(Name, Rate, Interval) ->
63-
Config = #{rate => Rate, interval => Interval},
64-
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config).
65-
66-
%% @doc Starts the throttle mechanism for a given `Name' with a given `Rate' per `Interval'.
67-
%%
68-
%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10).
69-
%% `Name' is needed to identify the rate as a single test can have different rates for different tasks.
70-
%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity.
71-
%% It also accepts a special value of `0' which limits the number of parallel executions associated with `Name' to `Rate'.
72-
-spec start(name(), rate(), interval(), pos_integer()) ->
73-
{ok, started | already_started} | {error, any()}.
74-
start(Name, Rate, Interval, NoOfProcesses) ->
75-
Config = #{rate => Rate, interval => Interval, parallelism => NoOfProcesses},
76-
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config).
77-
7865
%% @doc Pauses executions for the given `Name' as if `Rate' was set to `0'.
7966
%%
8067
%% Does not stop the scheduled rate changes.
@@ -127,8 +114,6 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, StepCo
127114
%%
128115
%% `Fn' is executed in the context of a new process spawned on the same node on which
129116
%% the process executing `run/2' runs, so a call to `run/2' is non-blocking.
130-
%% This function is used internally by both `send' and `send_and_wait/2' functions,
131-
%% so all those actions will be limited to the same rate when called with the same `Name'.
132117
%%
133118
%% Diagram showing function execution flow in distributed environment,
134119
%% generated using https://sequencediagram.org/:
@@ -178,13 +163,6 @@ send(Name, Msg) ->
178163
send(Name, Pid, Msg) ->
179164
amoc_throttle_runner:throttle(Name, {Pid, Msg}).
180165

181-
%% @doc Sends and receives the given message `Msg'.
182-
%%
183-
%% Deprecated in favour of `wait/1'
184-
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
185-
send_and_wait(Name, _) ->
186-
amoc_throttle_runner:throttle(Name, wait).
187-
188166
%% @doc Blocks the caller until the throttle mechanism allows.
189167
-spec wait(name()) -> ok | {error, any()}.
190168
wait(Name) ->

src/throttle/amoc_throttle_controller.erl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
-define(DEFAULT_STEP_SIZE, 1).
2626
-define(DEFAULT_INTERVAL, 60000). %% one minute
2727
-define(DEFAULT_NO_PROCESSES, 10).
28+
-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)).
2829
-define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)).
2930
-define(POS_INT(N), (is_integer(N) andalso N > 0)).
3031

@@ -51,8 +52,8 @@
5152
-type config() :: #{rate := amoc_throttle:rate(),
5253
interval := amoc_throttle:interval(),
5354
parallelism := non_neg_integer()}.
54-
-type gradual_rate_change() :: #{from_rate := amoc_throttle:rate(),
55-
to_rate := amoc_throttle:rate(),
55+
-type gradual_rate_change() :: #{from_rate := non_neg_integer(),
56+
to_rate := non_neg_integer(),
5657
interval := amoc_throttle:interval(),
5758
step_interval := pos_integer(),
5859
step_size := pos_integer(),
@@ -71,26 +72,32 @@ start_link() ->
7172
{ok, started | already_started} |
7273
{error, invalid_throttle | wrong_reconfiguration | wrong_no_of_procs}.
7374
ensure_throttle_processes_started(
74-
Name, #{interarrival := EveryMs} = Config)
75-
when is_atom(Name), ?NONNEG_INT(EveryMs) ->
75+
Name, #{interarrival := Interarrival} = Config)
76+
when is_atom(Name), ?TIMEOUT(Interarrival) ->
7677
raise_event_on_slave_node(Name, init),
77-
Config1 = #{rate => ?DEFAULT_INTERVAL div EveryMs, interval => ?DEFAULT_INTERVAL},
78+
Config1 = #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL},
7879
Config2 = Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)},
7980
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config2});
8081
ensure_throttle_processes_started(
8182
Name, #{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config)
82-
when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) ->
83+
when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) ->
8384
raise_event_on_slave_node(Name, init),
8485
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config});
8586
ensure_throttle_processes_started(
8687
Name, #{rate := Rate, interval := Interval} = Config)
87-
when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval) ->
88+
when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval) ->
8889
raise_event_on_slave_node(Name, init),
8990
Config1 = Config#{parallelism => ?DEFAULT_NO_PROCESSES},
9091
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
92+
ensure_throttle_processes_started(
93+
Name, #{rate := Rate, parallelism := NoOfProcesses} = Config)
94+
when is_atom(Name), ?TIMEOUT(Rate), ?POS_INT(NoOfProcesses) ->
95+
raise_event_on_slave_node(Name, init),
96+
Config1 = Config#{interval => ?DEFAULT_INTERVAL},
97+
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
9198
ensure_throttle_processes_started(
9299
Name, #{rate := Rate} = Config)
93-
when is_atom(Name), ?POS_INT(Rate) ->
100+
when is_atom(Name), ?TIMEOUT(Rate) ->
94101
raise_event_on_slave_node(Name, init),
95102
Config1 = Config#{interval => ?DEFAULT_INTERVAL, parallelism => ?DEFAULT_NO_PROCESSES},
96103
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
@@ -147,7 +154,7 @@ init([]) ->
147154
From :: {pid(), Tag :: term()}, state()) ->
148155
{reply, {ok, started | already_started}, state()} |
149156
{reply, {error, wrong_reconfiguration | wrong_no_of_procs}, state()};
150-
({pause | resume | stop}, From :: {pid(), Tag :: term()}, state()) ->
157+
({pause | resume | unlock | stop}, From :: {pid(), Tag :: term()}, state()) ->
151158
{reply, ok, state()} |
152159
{reply, Error :: any(), state()};
153160
({change_rate, name(), amoc_throttle:rate(), amoc_throttle:interval()},
@@ -175,8 +182,8 @@ handle_call({pause, Name}, _From, State) ->
175182
Error ->
176183
{reply, Error, State}
177184
end;
178-
handle_call({resume, Name}, _From, State) ->
179-
case run_in_all_processes(Name, resume) of
185+
handle_call({Op, Name}, _From, State) when unlock =:= Op; resume =:= Op ->
186+
case run_in_all_processes(Name, Op) of
180187
ok ->
181188
Info = maps:get(Name, State),
182189
{reply, ok, State#{Name => Info#throttle_info{active = true}}};
@@ -266,6 +273,7 @@ continue_plan(Name, State, Info, Plan) ->
266273
State#{Name => Info#throttle_info{rate = NewRate, change_plan = NewPlan}}.
267274

268275
-spec rate_per_minute(amoc_throttle:rate(), amoc_throttle:interval()) -> amoc_throttle:rate().
276+
rate_per_minute(infinity, _) -> infinity;
269277
rate_per_minute(_, 0) -> 0;
270278
rate_per_minute(Rate, Interval) ->
271279
(Rate * 60000) div Interval.
@@ -275,7 +283,7 @@ start_processes(Name, #{rate := Rate, interval := Interval, parallelism := NoOfP
275283
raise_event(Name, init),
276284
RatePerMinute = rate_per_minute(Rate, Interval),
277285
report_rate(Name, RatePerMinute),
278-
RealNoOfProcs = min(Rate, NoOfProcesses),
286+
RealNoOfProcs = expected_no_of_processes(Rate, NoOfProcesses),
279287
start_throttle_processes(Name, Interval, Rate, RealNoOfProcs),
280288
#throttle_info{rate = Rate, interval = Interval, active = true, no_of_procs = RealNoOfProcs}.
281289

@@ -334,7 +342,7 @@ run_in_all_processes(Name, Cmd) ->
334342

335343
verify_new_start_matches_running(Name, Config, Group, State) ->
336344
#{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config,
337-
ExpectedNoOfProcesses = min(Rate, NoOfProcesses),
345+
ExpectedNoOfProcesses = expected_no_of_processes(Rate, NoOfProcesses),
338346
case {length(Group), State} of
339347
{ExpectedNoOfProcesses, #{Name := #throttle_info{rate = Rate, interval = Interval}}} ->
340348
{reply, {ok, already_started}, State};
@@ -344,6 +352,11 @@ verify_new_start_matches_running(Name, Config, Group, State) ->
344352
{reply, {error, wrong_no_of_procs}, State}
345353
end.
346354

355+
expected_no_of_processes(0, NoOfProcesses) ->
356+
min(1, NoOfProcesses);
357+
expected_no_of_processes(Rate, NoOfProcesses) ->
358+
min(Rate, NoOfProcesses).
359+
347360
run_cmd(Pid, stop) ->
348361
amoc_throttle_process:stop(Pid);
349362
run_cmd(Pid, pause) ->

src/throttle/amoc_throttle_process.erl

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute
3131

3232
-record(state, {can_run_fn = true :: boolean(),
33-
pause = false :: boolean(),
34-
max_n :: non_neg_integer(),
33+
status = running :: running | paused,
34+
max_n :: infinity | non_neg_integer(),
3535
name :: atom(),
36-
n :: integer(),
36+
n :: infinity | non_neg_integer(),
3737
interval = 0 :: amoc_throttle:interval(), %%ms
38-
delay_between_executions = 0 :: non_neg_integer(), %%ms
38+
delay_between_executions = 0 :: timeout(), %%ms
3939
tref :: timer:tref() | undefined,
4040
schedule = [] :: [AmocThrottleRunnerProcess :: pid()],
4141
schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}).
@@ -120,9 +120,9 @@ handle_info(timeout, State) ->
120120
handle_cast(stop_process, State) ->
121121
{stop, normal, State};
122122
handle_cast(pause_process, State) ->
123-
{noreply, State#state{pause = true}, {continue, maybe_run_fn}};
123+
{noreply, State#state{status = paused}, {continue, maybe_run_fn}};
124124
handle_cast(resume_process, State) ->
125-
{noreply, State#state{pause = false}, {continue, maybe_run_fn}};
125+
{noreply, State#state{status = running}, {continue, maybe_run_fn}};
126126
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
127127
amoc_throttle_controller:telemetry_event(Name, request),
128128
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
@@ -155,25 +155,28 @@ format_status(#{state := #state{} = State} = FormatStatus) ->
155155
%% internal functions
156156
%%------------------------------------------------------------------------------
157157

158+
initial_state(_Name, Interval, infinity) ->
159+
#state{interval = Interval, n = infinity, max_n = infinity, delay_between_executions = 0};
160+
initial_state(_Name, Interval, 0) ->
161+
#state{interval = Interval, n = 0, max_n = 0, delay_between_executions = infinity};
158162
initial_state(Name, Interval, Rate) when Rate > 0 ->
159-
NewRate = case Rate < 5 of
160-
true ->
161-
Msg = <<"too low rate, please reduce NoOfProcesses">>,
162-
internal_error(Msg, Name, Rate, Interval),
163-
Rate;
164-
false ->
165-
Rate
166-
end,
167-
Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of
163+
case Rate < 5 of
164+
true ->
165+
Msg = <<"too low rate, please reduce NoOfProcesses">>,
166+
internal_warning(Msg, Name, Rate, Interval);
167+
false ->
168+
ok
169+
end,
170+
Delay = case {Interval, Interval div Rate, Interval rem Rate} of
168171
{0, _, _} -> 0; %% limit only No of simultaneous executions
169172
{_, I, _} when I < 10 ->
170173
Message = <<"too high rate, please increase NoOfProcesses">>,
171-
internal_error(Message, Name, Rate, Interval),
174+
internal_warning(Message, Name, Rate, Interval),
172175
10;
173176
{_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions;
174177
{_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1
175178
end,
176-
#state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}.
179+
#state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}.
177180

178181
merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN},
179182
#state{n = OldN, max_n = OldMaxN} = OldState) ->
@@ -183,6 +186,8 @@ merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = Ma
183186
max_n = MaxN, tref = undefined},
184187
maybe_start_timer(NewState).
185188

189+
maybe_start_timer(#state{delay_between_executions = infinity, tref = undefined} = State) ->
190+
State#state{can_run_fn = false};
186191
maybe_start_timer(#state{delay_between_executions = 0, tref = undefined} = State) ->
187192
State#state{can_run_fn = true};
188193
maybe_start_timer(#state{delay_between_executions = D, tref = undefined} = State) ->
@@ -207,15 +212,20 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
207212
NewSchedule = lists:reverse(SchRev),
208213
NewState = State#state{schedule = NewSchedule, schedule_reversed = []},
209214
maybe_run_fn(NewState);
210-
maybe_run_fn(#state{interval = 0, pause = false, n = N} = State) when N > 0 ->
215+
maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 ->
211216
NewState = run_fn(State),
212217
maybe_run_fn(NewState);
213-
maybe_run_fn(#state{can_run_fn = true, pause = false, n = N} = State) when N > 0 ->
218+
maybe_run_fn(#state{can_run_fn = true, status = running, n = N} = State) when N > 0 ->
214219
NewState = run_fn(State),
215220
NewState#state{can_run_fn = false};
216221
maybe_run_fn(State) ->
217222
State.
218223

224+
run_fn(#state{schedule = [RunnerPid | T], name = Name, n = infinity} = State) ->
225+
erlang:monitor(process, RunnerPid),
226+
amoc_throttle_runner:run(RunnerPid),
227+
amoc_throttle_controller:telemetry_event(Name, execute),
228+
State#state{schedule = T};
219229
run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
220230
erlang:monitor(process, RunnerPid),
221231
amoc_throttle_runner:run(RunnerPid),
@@ -244,10 +254,10 @@ internal_event(Msg, #state{name = Name} = State) ->
244254
amoc_telemetry:execute_log(
245255
debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg).
246256

247-
-spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any().
248-
internal_error(Msg, Name, Rate, Interval) ->
257+
-spec internal_warning(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any().
258+
internal_warning(Msg, Name, Rate, Interval) ->
249259
amoc_telemetry:execute_log(
250-
error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg).
260+
warning, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg).
251261

252262
printable_state(#state{} = State) ->
253263
Fields = record_info(fields, state),

0 commit comments

Comments
 (0)