Skip to content

Commit 2ae0719

Browse files
committed
Introduce call to throttle-unlock mechanism
1 parent a2f225d commit 2ae0719

File tree

4 files changed

+38
-7
lines changed

4 files changed

+38
-7
lines changed

src/throttle/amoc_throttle.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
%% API
66
-export([start/2, stop/1,
77
send/2, send/3, wait/1,
8-
run/2, pause/1, resume/1,
8+
run/2, pause/1, resume/1, unlock/1,
99
change_rate/2, change_rate/3,
1010
change_rate_gradually/2, change_rate_gradually/6]).
1111

@@ -74,6 +74,11 @@ pause(Name) ->
7474
resume(Name) ->
7575
amoc_throttle_controller:resume(Name).
7676

77+
%% @doc Unlocks executions for the given `Name' as if `Rate' was set to `infinity'.
78+
-spec unlock(name()) -> ok | {error, any()}.
79+
unlock(Name) ->
80+
amoc_throttle_controller:unlock(Name).
81+
7782
%% @doc Sets `Throttle' for `Name' according to the given values.
7883
%%
7984
%% Can change whether Amoc throttle limits `Name' to parallel executions or to `Rate' per `Interval',

src/throttle/amoc_throttle_controller.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
%% API
1010
-export([start_link/0,
1111
ensure_throttle_processes_started/2,
12-
pause/1, resume/1, stop/1,
12+
pause/1, resume/1, unlock/1, stop/1,
1313
change_rate/3, change_rate_gradually/2,
1414
raise_event_on_slave_node/2, telemetry_event/2]).
1515

@@ -112,6 +112,10 @@ pause(Name) ->
112112
resume(Name) ->
113113
gen_server:call(?MASTER_SERVER, {resume, Name}).
114114

115+
-spec unlock(name()) -> ok | {error, any()}.
116+
unlock(Name) ->
117+
gen_server:call(?MASTER_SERVER, {unlock, Name}).
118+
115119
-spec change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}.
116120
change_rate(Name, Rate, Interval) ->
117121
gen_server:call(?MASTER_SERVER, {change_rate, Name, Rate, Interval}).
@@ -362,7 +366,9 @@ run_cmd(Pid, stop) ->
362366
run_cmd(Pid, pause) ->
363367
amoc_throttle_process:pause(Pid);
364368
run_cmd(Pid, resume) ->
365-
amoc_throttle_process:resume(Pid).
369+
amoc_throttle_process:resume(Pid);
370+
run_cmd(Pid, unlock) ->
371+
amoc_throttle_process:unlock(Pid).
366372

367373
-spec verify_config(amoc_throttle:gradual_rate_config()) -> gradual_rate_change() | {error, any()}.
368374
verify_config(Config) ->

src/throttle/amoc_throttle_process.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
update/3,
1313
pause/1,
1414
resume/1,
15+
unlock/1,
1516
get_state/1,
1617
get_throttle_process/1,
1718
get_throttle_processes/1
@@ -30,7 +31,7 @@
3031
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute
3132

3233
-record(state, {can_run_fn = true :: boolean(),
33-
status = running :: running | paused,
34+
status = running :: running | paused | unlocked,
3435
max_n :: infinity | non_neg_integer(),
3536
name :: atom(),
3637
n :: infinity | non_neg_integer(),
@@ -69,6 +70,10 @@ pause(Pid) ->
6970
resume(Pid) ->
7071
gen_server:cast(Pid, resume_process).
7172

73+
-spec unlock(pid()) -> ok.
74+
unlock(Pid) ->
75+
gen_server:cast(Pid, unlock_process).
76+
7277
-spec get_state(pid()) -> map().
7378
get_state(Pid) ->
7479
gen_server:call(Pid, get_state).
@@ -123,6 +128,8 @@ handle_cast(pause_process, State) ->
123128
{noreply, State#state{status = paused}, {continue, maybe_run_fn}};
124129
handle_cast(resume_process, State) ->
125130
{noreply, State#state{status = running}, {continue, maybe_run_fn}};
131+
handle_cast(unlock_process, State) ->
132+
{noreply, State#state{status = unlocked}, {continue, maybe_run_fn}};
126133
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
127134
amoc_throttle_controller:telemetry_event(Name, request),
128135
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
@@ -212,6 +219,9 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
212219
NewSchedule = lists:reverse(SchRev),
213220
NewState = State#state{schedule = NewSchedule, schedule_reversed = []},
214221
maybe_run_fn(NewState);
222+
maybe_run_fn(#state{interval = _, status = unlocked, n = N} = State) when N > 0 ->
223+
NewState = run_fn(State),
224+
maybe_run_fn(NewState);
215225
maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 ->
216226
NewState = run_fn(State),
217227
maybe_run_fn(NewState);

test/throttle_SUITE.erl

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ groups() ->
3434
async_runner_dies_while_waiting_raises_exit,
3535
async_runner_dies_when_throttler_dies,
3636
run_with_interval_zero_limits_only_number_of_parallel_executions,
37-
pause_and_resume,
37+
pause_and_resume_and_unlock,
3838
get_state
3939
]}
4040
].
@@ -286,7 +286,7 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) ->
286286
amoc_throttle:send(?FUNCTION_NAME, receive_this),
287287
?assertMatch(ok, ?RECV(receive_this, 200)).
288288

289-
pause_and_resume(_) ->
289+
pause_and_resume_and_unlock(_) ->
290290
%% Start 100-per-10ms throttle with a single process
291291
?assertMatch({ok, started},
292292
amoc_throttle:start(?FUNCTION_NAME,
@@ -299,7 +299,17 @@ pause_and_resume(_) ->
299299
?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)),
300300
%% After resume the message is then received
301301
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
302-
?assertMatch(ok, ?RECV(receive_this, 200)).
302+
?assertMatch(ok, ?RECV(receive_this, 200)),
303+
%% If unlocked, all messages are always received
304+
?assertMatch(ok, amoc_throttle:unlock(?FUNCTION_NAME)),
305+
amoc_throttle:send(?FUNCTION_NAME, receive_this),
306+
?assertMatch(ok, ?RECV(receive_this, 200)),
307+
%% From unlock it can resume
308+
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
309+
State = get_state_of_one_process(?FUNCTION_NAME),
310+
?assertMatch(#{name := ?FUNCTION_NAME,
311+
delay_between_executions := 12},
312+
State).
303313

304314
get_state(_) ->
305315
?assertMatch({ok, started},

0 commit comments

Comments
 (0)