Skip to content

Commit d4e06ad

Browse files
committed
Add a config option to limit the number of exchanges
1 parent 392e5f9 commit d4e06ad

File tree

3 files changed

+104
-13
lines changed

3 files changed

+104
-13
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,20 @@ end}.
10121012
{mapping, "max_message_size", "rabbit.max_message_size",
10131013
[{datatype, integer}, {validators, ["max_message_size"]}]}.
10141014

1015+
{mapping, "cluster_exchange_limit", "rabbit.cluster_exchange_limit",
1016+
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.
1017+
1018+
{translation, "rabbit.cluster_exchange_limit",
1019+
fun(Conf) ->
1020+
case cuttlefish:conf_get("cluster_exchange_limit", Conf, undefined) of
1021+
undefined -> cuttlefish:unset();
1022+
infinity -> infinity;
1023+
Val when is_integer(Val) -> Val;
1024+
_ -> cuttlefish:invalid("should be a non-negative integer")
1025+
end
1026+
end
1027+
}.
1028+
10151029
%% Customising Socket Options.
10161030
%%
10171031
%% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,13 @@ serial(X) ->
102102
Internal :: boolean(),
103103
Args :: rabbit_framing:amqp_table(),
104104
Username :: rabbit_types:username(),
105-
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
105+
Ret :: {ok, rabbit_types:exchange()} |
106+
{error, timeout} |
107+
%% May exit with `#amqp_error{}` if validations fail:
108+
rabbit_types:channel_exit().
106109

107110
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
111+
ok = check_exchange_limits(XName),
108112
X = rabbit_exchange_decorator:set(
109113
rabbit_policy:set(#exchange{name = XName,
110114
type = Type,
@@ -141,6 +145,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
141145
{ok, X}
142146
end.
143147

148+
check_exchange_limits(XName) ->
149+
Limit = rabbit_misc:get_env(rabbit, cluster_exchange_limit, infinity),
150+
case rabbit_db_exchange:count() >= Limit of
151+
false ->
152+
ok;
153+
true ->
154+
case rabbit_db_exchange:exists(XName) of
155+
true ->
156+
%% Allow re-declares of existing exchanges when at the
157+
%% exchange limit.
158+
ok;
159+
false ->
160+
rabbit_misc:protocol_error(
161+
precondition_failed,
162+
"cannot declare ~ts: exchange limit of ~tp is reached",
163+
[rabbit_misc:rs(XName), Limit])
164+
end
165+
end.
166+
144167
%% Used with binaries sent over the wire; the type may not exist.
145168

146169
-spec check_type

deps/rabbit/test/cluster_limit_SUITE.erl

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313
-compile([nowarn_export_all, export_all]).
1414

15+
-define(EXCHANGE_LIMIT, 10).
1516

1617
all() ->
1718
[
@@ -22,7 +23,8 @@ groups() ->
2223
[
2324
{clustered, [],
2425
[
25-
{size_2, [], [queue_limit]}
26+
{size_2, [], [queue_limit,
27+
exchange_limit]}
2628
]}
2729
].
2830

@@ -34,7 +36,8 @@ init_per_suite(Config0) ->
3436
rabbit_ct_helpers:log_environment(),
3537
Config1 = rabbit_ct_helpers:merge_app_env(
3638
Config0, {rabbit, [{quorum_tick_interval, 1000},
37-
{cluster_queue_limit, 3}]}),
39+
{cluster_queue_limit, 3},
40+
{cluster_exchange_limit, ?EXCHANGE_LIMIT}]}),
3841
rabbit_ct_helpers:run_setup_steps(Config1, []).
3942

4043
end_per_suite(Config) ->
@@ -101,48 +104,99 @@ queue_limit(Config) ->
101104
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
102105
Q1 = ?config(queue_name, Config),
103106
?assertEqual({'queue.declare_ok', Q1, 0, 0},
104-
declare(Ch, Q1)),
107+
declare_queue(Ch, Q1)),
105108

106109
Q2 = ?config(alt_queue_name, Config),
107110
?assertEqual({'queue.declare_ok', Q2, 0, 0},
108-
declare(Ch, Q2)),
111+
declare_queue(Ch, Q2)),
109112

110113
Q3 = ?config(alt_2_queue_name, Config),
111114
?assertEqual({'queue.declare_ok', Q3, 0, 0},
112-
declare(Ch, Q3)),
115+
declare_queue(Ch, Q3)),
113116
Q4 = ?config(over_limit_queue_name, Config),
114117
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
115118
?assertExit(
116119
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
117-
declare(Ch, Q4)),
120+
declare_queue(Ch, Q4)),
118121

119122
%% Trying the second server, in the cluster, but no queues on it,
120123
%% but should still fail as the limit is cluster wide.
121124
?assertExit(
122125
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
123-
declare(Ch2, Q4)),
126+
declare_queue(Ch2, Q4)),
124127

125128
%Trying other types of queues
126129
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
127130
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
128131
?assertExit(
129132
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
130-
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
133+
declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
131134
?assertExit(
132135
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
133-
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
136+
declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
134137
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
135138
ok.
136139

137-
declare(Ch, Q) ->
138-
declare(Ch, Q, []).
140+
exchange_limit(Config) ->
141+
DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []),
142+
?assert(?EXCHANGE_LIMIT > DefaultXs),
139143

140-
declare(Ch, Q, Args) ->
144+
[Server0, Server1] =
145+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
146+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
147+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
148+
149+
%% Reach the limit.
150+
[begin
151+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
152+
#'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>)
153+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
154+
155+
%% Trying to declare the next exchange fails.
156+
OverLimitXName = <<"over-limit-x">>,
157+
?assertExit(
158+
{{shutdown, {server_initiated_close, 406,
159+
<<"PRECONDITION_FAILED", _/binary>>}}, _},
160+
declare_exchange(Ch1, OverLimitXName, <<"fanout">>)),
161+
162+
%% Existing exchanges can be re-declared.
163+
ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])),
164+
#'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>),
165+
166+
%% The limit is cluster wide: the other node cannot declare the exchange
167+
%% either.
168+
?assertExit(
169+
{{shutdown, {server_initiated_close, 406,
170+
<<"PRECONDITION_FAILED", _/binary>>}}, _},
171+
declare_exchange(Ch2, OverLimitXName, <<"fanout">>)),
172+
173+
%% Clean up extra exchanges
174+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0),
175+
[begin
176+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
177+
#'exchange.delete_ok'{} = amqp_channel:call(
178+
Ch3,
179+
#'exchange.delete'{exchange = XName})
180+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
181+
182+
ok.
183+
184+
%% -------------------------------------------------------------------
185+
186+
declare_queue(Ch, Q) ->
187+
declare_queue(Ch, Q, []).
188+
189+
declare_queue(Ch, Q, Args) ->
141190
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
142191
durable = true,
143192
auto_delete = false,
144193
arguments = Args}).
145194

195+
declare_exchange(Ch, Name, Type) ->
196+
amqp_channel:call(Ch, #'exchange.declare'{exchange = Name,
197+
type = Type,
198+
durable = true}).
199+
146200
delete_queues() ->
147201
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
148202
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)