Skip to content

Commit a5f279b

Browse files
committed
Keep a single table of users in the worker supervisors
1 parent 0d11905 commit a5f279b

File tree

4 files changed

+144
-169
lines changed

4 files changed

+144
-169
lines changed

src/amoc_controller.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
%% ------------------------------------------------------------------
7272
%% gen_server Function Exports
7373
%% ------------------------------------------------------------------
74-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
74+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
7575

7676
%% ------------------------------------------------------------------
7777
%% API Function Definitions
@@ -198,6 +198,11 @@ handle_cast(_Msg, State) ->
198198
handle_info(_Msg, State) ->
199199
{noreply, State}.
200200

201+
%% @private
202+
-spec terminate(term(), state()) -> any().
203+
terminate(_Reason, _State) ->
204+
amoc_users_sup:terminate_all_children().
205+
201206
%% ------------------------------------------------------------------
202207
%% internal functions
203208
%% ------------------------------------------------------------------

src/users/amoc_users_sup.erl

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
%%
55
%% It spawns a pool of workers as big as online schedulers. When starting a new user, as the user is
66
%% identified by ID, a worker will be chosen for this user based on its ID
7-
%% (see get_sup_for_user_id/1).
7+
%% (see gen_sup_from_userid/1).
88
%%
99
%% The currently running number of users is stored in an atomic that all workers update and the
1010
%% controller can read.
@@ -16,9 +16,8 @@
1616
-export([start_link/0, init/1]).
1717

1818
%% API
19-
-export([incr_no_of_users/1, decr_no_of_users/1, count_no_of_users/0,
20-
start_child/3, stop_child/2, start_children/3, stop_children/2, terminate_all_children/0]).
21-
19+
-export([handle_up_user/3, handle_down_user/2, count_no_of_users/0]).
20+
-export([start_children/3, stop_child/2, stop_children/2, terminate_all_children/0]).
2221
-export([distribute/2, get_all_children/0]).
2322

2423
-type count() :: non_neg_integer().
@@ -34,26 +33,34 @@
3433
sups_count :: pos_integer()
3534
}).
3635

36+
-define(SUPERVISOR, amoc_users_sup).
37+
-define(STORAGE, amoc_users_sup_storage).
38+
-define(TABLE, amoc_users_sup_table).
39+
3740
%% Supervisor
3841

3942
%% @private
4043
-spec start_link() -> supervisor:startlink_ret().
4144
start_link() ->
42-
Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, no_args),
43-
UserSups = supervisor:which_children(?MODULE),
44-
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups],
45+
Ret = supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, no_args),
46+
UserSups = supervisor:which_children(?SUPERVISOR),
47+
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups,
48+
is_integer(N), is_pid(Pid)],
4549
IndexedSups = lists:keysort(2, IndexedSupsUnsorted),
4650
UserSupPidsTuple = list_to_tuple([ Pid || {Pid, _} <- IndexedSups ]),
4751
SupCount = tuple_size(UserSupPidsTuple),
4852
Atomics = atomics:new(1 + SupCount, [{signed, false}]),
4953
Storage = #storage{user_count = Atomics, sups = UserSupPidsTuple,
5054
sups_indexed = IndexedSups, sups_count = SupCount},
51-
persistent_term:put(?MODULE, Storage),
55+
persistent_term:put(?STORAGE, Storage),
5256
Ret.
5357

5458
%% @private
5559
-spec init(no_args) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
5660
init(no_args) ->
61+
EtsOpts = [ordered_set, public, named_table,
62+
{read_concurrency, true}, {write_concurrency, auto}],
63+
_Table = ets:new(?TABLE, EtsOpts),
5764
Specs = [
5865
#{
5966
id => {amoc_users_worker_sup, N},
@@ -73,18 +80,20 @@ indexes() ->
7380
%% API
7481
-spec count_no_of_users() -> count().
7582
count_no_of_users() ->
76-
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
83+
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
7784
atomics:get(Atomics, 1).
7885

79-
-spec incr_no_of_users(non_neg_integer()) -> any().
80-
incr_no_of_users(SupNum) when SupNum > 1 ->
81-
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
86+
-spec handle_up_user(non_neg_integer(), pid(), amoc_scenario:user_id()) -> any().
87+
handle_up_user(SupNum, Pid, Id) when SupNum > 1 ->
88+
ets:insert(?TABLE, {Pid, Id}),
89+
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
8290
atomics:add(Atomics, SupNum, 1),
8391
atomics:add(Atomics, 1, 1).
8492

85-
-spec decr_no_of_users(non_neg_integer()) -> ok.
86-
decr_no_of_users(SupNum) when SupNum > 1 ->
87-
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
93+
-spec handle_down_user(non_neg_integer(), pid()) -> ok.
94+
handle_down_user(SupNum, Pid) when SupNum > 1 ->
95+
ets:delete(?TABLE, Pid),
96+
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
8897
atomics:sub(Atomics, SupNum, 1),
8998
case atomics:sub_get(Atomics, 1, 1) of
9099
0 ->
@@ -93,24 +102,22 @@ decr_no_of_users(SupNum) when SupNum > 1 ->
93102
ok
94103
end.
95104

96-
-spec start_child(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
97-
start_child(Scenario, Id, ScenarioState) ->
98-
Sup = get_sup_for_user_id(Id),
99-
amoc_users_worker_sup:start_child(Sup, Scenario, Id, ScenarioState).
100-
101105
-spec stop_child(pid(), boolean()) -> ok.
102106
stop_child(Pid, Force) ->
103-
amoc_users_worker_sup:stop_child(Pid, Force).
107+
case ets:lookup(?TABLE, Pid) of
108+
[Object] ->
109+
Sup = gen_sup_from_userid(Object),
110+
amoc_users_worker_sup:stop_children(Sup, [Pid], Force);
111+
_ ->
112+
ok
113+
end.
104114

105115
%% Group all children based on ID to their respective worker supervisor and cast a request with each
106116
%% group at once. This way we reduce the number of casts to each worker to always one, instead of
107117
%% depending on the number of users.
108118
-spec start_children(amoc:scenario(), [amoc_scenario:user_id()], any()) -> ok.
109119
start_children(Scenario, UserIds, ScenarioState) ->
110-
State = persistent_term:get(?MODULE),
111-
#storage{sups = Supervisors, sups_indexed = IndexedSups, sups_count = SupCount} = State,
112-
Acc = maps:from_list([ {Sup, []} || {Sup, _} <- IndexedSups ]),
113-
Assignments = assign_users_to_sups(SupCount, Supervisors, UserIds, Acc),
120+
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, UserIds),
114121
CastFun = fun(Sup, Users) ->
115122
amoc_users_worker_sup:start_children(Sup, Scenario, Users, ScenarioState)
116123
end,
@@ -120,47 +127,53 @@ start_children(Scenario, UserIds, ScenarioState) ->
120127
%% in order to load-balance the request among all workers.
121128
-spec stop_children(non_neg_integer(), boolean()) -> non_neg_integer().
122129
stop_children(Count, Force) ->
123-
{CountRemove, Assignments} = assign_counts(Count),
124-
[ amoc_users_worker_sup:stop_children(Sup, Int, Force) || {Sup, Int} <- Assignments ],
125-
CountRemove.
130+
Users = case ets:match_object(?TABLE, '$1', Count) of
131+
'$end_of_table' ->
132+
[];
133+
{Objects, _} ->
134+
Objects
135+
end,
136+
stop_children_assignments(Users, Force),
137+
length(Users).
126138

127139
-spec get_all_children() -> [{pid(), amoc_scenario:user_id()}].
128140
get_all_children() ->
129-
#storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE),
130-
All = [ amoc_users_worker_sup:get_all_children(Sup) || {Sup, _} <- IndexedSups ],
131-
lists:flatten(All).
141+
ets:tab2list(?TABLE).
132142

133143
-spec terminate_all_children() -> any().
134144
terminate_all_children() ->
135-
#storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE),
136-
[ amoc_users_worker_sup:terminate_all_children(Sup) || {Sup, _} <- IndexedSups ].
145+
Match = ets:match_object(?TABLE, '$1', 500),
146+
do_terminate_all_my_children(Match).
147+
148+
-spec stop_children_assignments([{pid(), amoc_scenario:user_id()}], boolean()) -> ok.
149+
stop_children_assignments(Users, Force) ->
150+
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, fun get_pid/1, Users),
151+
CastFun = fun(Sup, Assignment) ->
152+
amoc_users_worker_sup:stop_children(Sup, Assignment, Force)
153+
end,
154+
maps:foreach(CastFun, Assignments).
155+
156+
%% ets:continuation/0 type is unfortunately not exported from the ets module.
157+
-spec do_terminate_all_my_children({[tuple()], dynamic()} | '$end_of_table') -> ok.
158+
do_terminate_all_my_children({Users, Continuation}) ->
159+
stop_children_assignments(Users, true),
160+
Match = ets:match_object(Continuation),
161+
do_terminate_all_my_children(Match);
162+
do_terminate_all_my_children('$end_of_table') ->
163+
ok.
137164

138165
%% Helpers
139-
-spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid().
140-
get_sup_for_user_id(Id) ->
141-
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
166+
-spec gen_sup_from_userid({pid(), amoc_scenario:user_id()} | amoc_scenario:user_id()) -> pid().
167+
gen_sup_from_userid({_Pid, Id}) ->
168+
gen_sup_from_userid(Id);
169+
gen_sup_from_userid(Id) ->
170+
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?STORAGE),
142171
Index = erlang:phash2(Id, SupCount) + 1,
143172
element(Index, Supervisors).
144173

145-
%% assign which users each worker will be requested to add
146-
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
147-
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
148-
assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) ->
149-
Index = erlang:phash2(Id, SupCount) + 1,
150-
ChosenSup = element(Index, Supervisors),
151-
Vs = maps:get(ChosenSup, Acc),
152-
NewAcc = Acc#{ChosenSup := [Id | Vs]},
153-
assign_users_to_sups(SupCount, Supervisors, Ids, NewAcc);
154-
assign_users_to_sups(_, _, [], Acc) ->
155-
Acc.
156-
157-
%% assign how many users each worker will be requested to remove,
158-
%% taking care of the fact that worker might not have enough users.
159-
-spec assign_counts(count()) -> {count(), assignment()}.
160-
assign_counts(Total) ->
161-
#storage{user_count = Atomics, sups_indexed = Indexed} = persistent_term:get(?MODULE),
162-
SupervisorsWithCounts = [ {Sup, atomics:get(Atomics, SupPos)} || {Sup, SupPos} <- Indexed ],
163-
distribute(Total, SupervisorsWithCounts).
174+
-spec get_pid({pid(), amoc_scenario:user_id()}) -> pid().
175+
get_pid({Pid, _}) ->
176+
Pid.
164177

165178
-spec distribute(count(), assignment()) -> {count(), assignment()}.
166179
distribute(Total, SupervisorsWithCounts) ->

0 commit comments

Comments
 (0)