Skip to content

Commit 0d11905

Browse files
committed
Asynchronously register users after interarrival throttle allows
Unfortunately we cannot set the interarrival throttle to occur before the `proc_lib:init_ack/2` call because this would block the supervisor. This change also allows us to init processes completely asynchronously, as proc_lib is sync
1 parent 513b3eb commit 0d11905

File tree

3 files changed

+45
-25
lines changed

3 files changed

+45
-25
lines changed

src/amoc_scenario.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ start(Scenario, Id, State) ->
9191
{false, false} ->
9292
exit("the scenario module must export either start/2 or start/1 function")
9393
end,
94-
infinity =:= amoc_controller:get_interarrival() orelse amoc_throttle:wait(interarrival),
9594
telemetry:span([amoc, scenario, start], Metadata, Span).
9695

9796
%% ------------------------------------------------------------------

src/users/amoc_user.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99

1010
-type state() :: term().
1111

12-
-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
13-
{ok, pid()} | {error, term()}.
12+
-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) -> {pid(), reference()}.
1413
start_link(Scenario, Id, State) ->
15-
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).
14+
proc_lib:spawn_opt(?MODULE, init, [self(), Scenario, Id, State], [link, monitor]).
1615

1716
-spec stop() -> ok.
1817
stop() ->
@@ -24,6 +23,7 @@ stop(Pid, Force) when is_pid(Pid) ->
2423

2524
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
2625
init(Parent, Scenario, Id, State) ->
27-
proc_lib:init_ack(Parent, {ok, self()}),
2826
process_flag(trap_exit, true),
27+
infinity =:= amoc_controller:get_interarrival() orelse amoc_throttle:wait(interarrival),
28+
amoc_users_worker_sup:child_up(Parent, Id),
2929
amoc_scenario:start(Scenario, Id, State).

src/users/amoc_users_worker_sup.erl

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
-export([start_link/1]).
1818
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
1919

20-
-export([start_child/4, stop_child/2, start_children/4, stop_children/3, terminate_all_children/1]).
20+
-export([start_child/4, stop_child/2, child_up/2,
21+
start_children/4, stop_children/3, terminate_all_children/1]).
2122

2223
-export([get_all_children/1]).
2324

2425
-record(state, {
2526
index :: non_neg_integer(),
2627
table :: ets:table(),
28+
waiting = #{} :: #{reference() := pid()},
2729
tasks = #{} :: #{reference() := pid()}
2830
}).
2931
-type state() :: #state{}.
@@ -39,6 +41,10 @@ start_link(N) ->
3941
start_child(Sup, Scenario, Id, ScenarioState) ->
4042
gen_server:cast(Sup, {start_child, Scenario, Id, ScenarioState}).
4143

44+
-spec child_up(pid(), amoc_scenario:user_id()) -> any().
45+
child_up(Sup, Id) ->
46+
gen_server:cast(Sup, {child_up, self(), Id}).
47+
4248
-spec start_children(pid(), amoc:scenario(), [amoc_scenario:user_id()], any()) -> ok.
4349
start_children(Sup, Scenario, UserIds, ScenarioState) ->
4450
gen_server:cast(Sup, {start_children, Scenario, UserIds, ScenarioState}).
@@ -84,10 +90,13 @@ handle_call(get_all_children, _From, #state{table = Table} = State) ->
8490
{stop_children, non_neg_integer(), boolean()} |
8591
terminate_all_children.
8692
handle_cast({start_child, Scenario, Id, ScenarioState}, State) ->
87-
do_start_child(Scenario, Id, ScenarioState, State),
88-
{noreply, State};
93+
NewState = do_start_child(Scenario, [Id], ScenarioState, State),
94+
{noreply, NewState};
8995
handle_cast({start_children, Scenario, Ids, ScenarioState}, State) ->
90-
[ do_start_child(Scenario, Id, ScenarioState, State) || Id <- Ids],
96+
NewState = do_start_child(Scenario, Ids, ScenarioState, State),
97+
{noreply, NewState};
98+
handle_cast({child_up, Pid, Id}, #state{index = N, table = Tid} = State) ->
99+
handle_up_user(Tid, Pid, Id, N),
91100
{noreply, State};
92101
handle_cast({stop_children, 0, _}, State) ->
93102
{noreply, State};
@@ -102,19 +111,26 @@ handle_cast({stop_children, Int, ForceRemove}, #state{table = Table} = State) ->
102111
{noreply, NewState};
103112
handle_cast(terminate_all_children, State) ->
104113
NewState = do_terminate_all_my_children(State),
105-
{noreply, NewState};
106-
handle_cast(_Msg, State) ->
107-
{noreply, State}.
114+
{noreply, NewState}.
108115

109116
%% @private
110117
-spec handle_info(Request, state()) -> {noreply, state()} when
111118
Request :: {child_up, pid(), amoc_scenario:user_id()} |
112119
{'DOWN', reference(), process, pid(), term()} |
113120
{'EXIT', pid(), term()}.
114-
handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{tasks = Tasks} = State) ->
115-
{noreply, State#state{tasks = maps:remove(Ref, Tasks)}};
116-
handle_info({'EXIT', Pid, _Reason}, #state{index = N, table = Table} = State) ->
117-
handle_down_user(Table, Pid, N),
121+
handle_info({'DOWN', Ref, process, Pid, _Reason},
122+
#state{waiting = Waiting, tasks = Tasks} = State) ->
123+
case {Waiting, Tasks} of
124+
{#{Pid := Ref}, _} ->
125+
flush_exit(Pid),
126+
{noreply, State#state{waiting = maps:remove(Pid, Waiting)}};
127+
{_, #{Ref := Pid}} ->
128+
{noreply, State#state{tasks = maps:remove(Ref, Tasks)}};
129+
_ ->
130+
{noreply, State}
131+
end;
132+
handle_info({'EXIT', Pid, _Reason}, #state{index = N, table = Tid} = State) ->
133+
handle_down_user(Tid, Pid, N),
118134
{noreply, State};
119135
handle_info(_Info, State) ->
120136
{noreply, State}.
@@ -126,14 +142,19 @@ terminate(_Reason, State) ->
126142

127143
%% Helpers
128144

129-
-spec do_start_child(module(), amoc_scenario:user_id(), term(), state()) -> any().
130-
do_start_child(Scenario, Id, ScenarioState, #state{index = N, table = Table}) ->
131-
case amoc_user:start_link(Scenario, Id, ScenarioState) of
132-
{ok, Pid} ->
133-
handle_up_user(Table, Pid, Id, N);
134-
_ ->
135-
ok
136-
end.
145+
-spec flush_exit(pid()) -> ok.
146+
flush_exit(Pid) ->
147+
unlink(Pid),
148+
receive {'EXIT', Pid, _} -> ok after 0 -> ok end.
149+
150+
-spec do_start_child(amoc:scenario(), [amoc_scenario:user_id()], term(), state()) -> state().
151+
do_start_child(Scenario, Ids, ScenarioState, #state{waiting = Waiting} = State) ->
152+
Fun = fun(Id, Acc) ->
153+
{Pid, Ref} = amoc_user:start_link(Scenario, Id, ScenarioState),
154+
Acc#{Pid => Ref}
155+
end,
156+
NewWaiting = lists:foldl(Fun, Waiting, Ids),
157+
State#state{waiting = NewWaiting}.
137158

138159
-spec handle_up_user(ets:table(), pid(), amoc_scenario:user_id(), non_neg_integer()) -> any().
139160
handle_up_user(Table, Pid, Id, SupNum) ->
@@ -156,7 +177,7 @@ maybe_track_task_to_stop_my_children(State, Pids, false) ->
156177
State;
157178
maybe_track_task_to_stop_my_children(#state{tasks = Tasks} = State, Pids, true) ->
158179
{Pid, Ref} = spawn_monitor(shutdown_and_kill_after_timeout_fun(Pids)),
159-
State#state{tasks = Tasks#{Pid => Ref}}.
180+
State#state{tasks = Tasks#{Ref => Pid}}.
160181

161182
-spec shutdown_and_kill_after_timeout_fun(pid() | [pid()]) -> fun(() -> term()).
162183
shutdown_and_kill_after_timeout_fun([_ | _] = Pids) ->

0 commit comments

Comments
 (0)