Skip to content

Commit 36bc319

Browse files
committed
Add a config option to limit the number of exchanges
1 parent 107c03f commit 36bc319

File tree

3 files changed

+103
-12
lines changed

3 files changed

+103
-12
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,20 @@ end}.
10121012
{mapping, "max_message_size", "rabbit.max_message_size",
10131013
[{datatype, integer}, {validators, ["max_message_size"]}]}.
10141014

1015+
{mapping, "exchange_max", "rabbit.exchange_max",
1016+
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.
1017+
1018+
{translation, "rabbit.exchange_max",
1019+
fun(Conf) ->
1020+
case cuttlefish:conf_get("exchange_max", Conf, undefined) of
1021+
undefined -> cuttlefish:unset();
1022+
infinity -> infinity;
1023+
Val when is_integer(Val) -> Val;
1024+
_ -> cuttlefish:invalid("should be a non-negative integer")
1025+
end
1026+
end
1027+
}.
1028+
10151029
%% Customising Socket Options.
10161030
%%
10171031
%% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ serial(X) ->
105105
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
106106

107107
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
108+
ok = check_exchange_limits(XName),
108109
X = rabbit_exchange_decorator:set(
109110
rabbit_policy:set(#exchange{name = XName,
110111
type = Type,
@@ -141,6 +142,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
141142
{ok, X}
142143
end.
143144

145+
check_exchange_limits(XName) ->
146+
Limit = rabbit_misc:get_env(rabbit, exchange_max, infinity),
147+
case rabbit_db_exchange:count() >= Limit of
148+
false ->
149+
ok;
150+
true ->
151+
case rabbit_db_exchange:exists(XName) of
152+
true ->
153+
%% Allow re-declares of existing exchanges when at the
154+
%% exchange limit.
155+
ok;
156+
false ->
157+
rabbit_misc:protocol_error(
158+
precondition_failed,
159+
"cannot declare ~ts: exchange limit of ~tp is reached",
160+
[rabbit_misc:rs(XName), Limit])
161+
end
162+
end.
163+
144164
%% Used with binaries sent over the wire; the type may not exist.
145165

146166
-spec check_type

deps/rabbit/test/cluster_limit_SUITE.erl

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313
-compile([nowarn_export_all, export_all]).
1414

15+
-define(EXCHANGE_LIMIT, 10).
1516

1617
all() ->
1718
[
@@ -22,7 +23,8 @@ groups() ->
2223
[
2324
{clustered, [],
2425
[
25-
{size_2, [], [queue_limit]}
26+
{size_2, [], [queue_limit,
27+
exchange_limit]}
2628
]}
2729
].
2830

@@ -34,7 +36,8 @@ init_per_suite(Config0) ->
3436
rabbit_ct_helpers:log_environment(),
3537
Config1 = rabbit_ct_helpers:merge_app_env(
3638
Config0, {rabbit, [{quorum_tick_interval, 1000},
37-
{cluster_queue_limit, 3}]}),
39+
{cluster_queue_limit, 3},
40+
{exchange_max, ?EXCHANGE_LIMIT}]}),
3841
rabbit_ct_helpers:run_setup_steps(Config1, []).
3942

4043
end_per_suite(Config) ->
@@ -101,48 +104,102 @@ queue_limit(Config) ->
101104
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
102105
Q1 = ?config(queue_name, Config),
103106
?assertEqual({'queue.declare_ok', Q1, 0, 0},
104-
declare(Ch, Q1)),
107+
declare_queue(Ch, Q1)),
105108

106109
Q2 = ?config(alt_queue_name, Config),
107110
?assertEqual({'queue.declare_ok', Q2, 0, 0},
108-
declare(Ch, Q2)),
111+
declare_queue(Ch, Q2)),
109112

110113
Q3 = ?config(alt_2_queue_name, Config),
111114
?assertEqual({'queue.declare_ok', Q3, 0, 0},
112-
declare(Ch, Q3)),
115+
declare_queue(Ch, Q3)),
113116
Q4 = ?config(over_limit_queue_name, Config),
114117
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
115118
?assertExit(
116119
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
117-
declare(Ch, Q4)),
120+
declare_queue(Ch, Q4)),
118121

119122
%% Trying the second server, in the cluster, but no queues on it,
120123
%% but should still fail as the limit is cluster wide.
121124
?assertExit(
122125
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
123-
declare(Ch2, Q4)),
126+
declare_queue(Ch2, Q4)),
124127

125128
%Trying other types of queues
126129
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
127130
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
128131
?assertExit(
129132
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
130-
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
133+
declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
131134
?assertExit(
132135
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
133-
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
136+
declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
134137
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
135138
ok.
136139

137-
declare(Ch, Q) ->
138-
declare(Ch, Q, []).
140+
exchange_limit(Config) ->
141+
DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []),
142+
?assert(?EXCHANGE_LIMIT > DefaultXs),
139143

140-
declare(Ch, Q, Args) ->
144+
[Server0, Server1] =
145+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
146+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
147+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
148+
149+
%% Reach the limit.
150+
[begin
151+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
152+
#'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>)
153+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
154+
155+
%% Trying to declare the next exchange fails.
156+
OverLimitXName = <<"over-limit-x">>,
157+
ExpectedError = list_to_binary(rabbit_misc:format(
158+
"PRECONDITION_FAILED - cannot declare "
159+
"exchange '~s' in vhost '/': "
160+
"exchange limit of ~b is reached",
161+
[OverLimitXName, ?EXCHANGE_LIMIT])),
162+
?assertExit(
163+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
164+
declare_exchange(Ch1, OverLimitXName, <<"fanout">>)),
165+
166+
%% Existing exchanges can be re-declared.
167+
ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])),
168+
#'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>),
169+
170+
%% The limit is cluster wide: the other node cannot declare the exchange
171+
%% either.
172+
?assertExit(
173+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
174+
declare_exchange(Ch2, OverLimitXName, <<"fanout">>)),
175+
176+
%% Clean up extra exchanges
177+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0),
178+
[begin
179+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
180+
#'exchange.delete_ok'{} = amqp_channel:call(
181+
Ch3,
182+
#'exchange.delete'{exchange = XName})
183+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
184+
185+
ok.
186+
187+
%% -------------------------------------------------------------------
188+
189+
declare_queue(Ch, Q) ->
190+
declare_queue(Ch, Q, []).
191+
192+
declare_queue(Ch, Q, Args) ->
141193
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
142194
durable = true,
143195
auto_delete = false,
144196
arguments = Args}).
145197

198+
declare_exchange(Ch, Name, Type) ->
199+
amqp_channel:call(Ch, #'exchange.declare'{exchange = Name,
200+
type = Type,
201+
durable = true}).
202+
146203
delete_queues() ->
147204
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
148205
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)