Skip to content

DO NOT MERGE (needs signed CLA) Add a configurable limit for number of exchanges #14304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,20 @@ end}.
{mapping, "max_message_size", "rabbit.max_message_size",
[{datatype, integer}, {validators, ["max_message_size"]}]}.

{mapping, "cluster_exchange_limit", "rabbit.cluster_exchange_limit",
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.

{translation, "rabbit.cluster_exchange_limit",
fun(Conf) ->
case cuttlefish:conf_get("cluster_exchange_limit", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) -> Val;
_ -> cuttlefish:invalid("should be a non-negative integer")
end
end
}.

%% Customising Socket Options.
%%
%% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for
Expand Down
16 changes: 11 additions & 5 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,11 @@ count_in_mnesia() ->
mnesia:table_info(?MNESIA_TABLE, size).

count_in_khepri() ->
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:count(Path) of
{ok, Count} -> Count;
_ -> 0
try
ets:info(?KHEPRI_PROJECTION, size)
catch
error:badarg ->
0
end.

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -869,7 +870,12 @@ exists_in_mnesia(Name) ->
ets:member(?MNESIA_TABLE, Name).

exists_in_khepri(Name) ->
rabbit_khepri:exists(khepri_exchange_path(Name)).
try
ets:member(?KHEPRI_PROJECTION, Name)
catch
error:badarg ->
false
end.

%% -------------------------------------------------------------------
%% clear().
Expand Down
25 changes: 24 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ serial(X) ->
Internal :: boolean(),
Args :: rabbit_framing:amqp_table(),
Username :: rabbit_types:username(),
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
Ret :: {ok, rabbit_types:exchange()} |
{error, timeout} |
%% May exit with `#amqp_error{}` if validations fail:
rabbit_types:channel_exit().

declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
ok = check_exchange_limits(XName),
X = rabbit_exchange_decorator:set(
rabbit_policy:set(#exchange{name = XName,
type = Type,
Expand Down Expand Up @@ -141,6 +145,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
{ok, X}
end.

check_exchange_limits(XName) ->
Limit = rabbit_misc:get_env(rabbit, cluster_exchange_limit, infinity),
case rabbit_db_exchange:count() >= Limit of
false ->
ok;
true ->
case rabbit_db_exchange:exists(XName) of
true ->
%% Allow re-declares of existing exchanges when at the
%% exchange limit.
ok;
Comment on lines +154 to +158
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: allowing redeclares is important because if you reach the exchange limit and restart a Rabbit node, it will fail to boot due to the exchange declarations done in boot steps

false ->
rabbit_misc:protocol_error(
precondition_failed,
"cannot declare ~ts: exchange limit of ~tp is reached",
[rabbit_misc:rs(XName), Limit])
end
end.

%% Used with binaries sent over the wire; the type may not exist.

-spec check_type
Expand Down
78 changes: 66 additions & 12 deletions deps/rabbit/test/cluster_limit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-compile([nowarn_export_all, export_all]).

-define(EXCHANGE_LIMIT, 10).

all() ->
[
Expand All @@ -22,7 +23,8 @@ groups() ->
[
{clustered, [],
[
{size_2, [], [queue_limit]}
{size_2, [], [queue_limit,
exchange_limit]}
]}
].

Expand All @@ -34,7 +36,8 @@ init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{quorum_tick_interval, 1000},
{cluster_queue_limit, 3}]}),
{cluster_queue_limit, 3},
{cluster_exchange_limit, ?EXCHANGE_LIMIT}]}),
rabbit_ct_helpers:run_setup_steps(Config1, []).

end_per_suite(Config) ->
Expand Down Expand Up @@ -101,48 +104,99 @@ queue_limit(Config) ->
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q1 = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1)),
declare_queue(Ch, Q1)),

Q2 = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2)),
declare_queue(Ch, Q2)),

Q3 = ?config(alt_2_queue_name, Config),
?assertEqual({'queue.declare_ok', Q3, 0, 0},
declare(Ch, Q3)),
declare_queue(Ch, Q3)),
Q4 = ?config(over_limit_queue_name, Config),
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch, Q4)),
declare_queue(Ch, Q4)),

%% Trying the second server, in the cluster, but no queues on it,
%% but should still fail as the limit is cluster wide.
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch2, Q4)),
declare_queue(Ch2, Q4)),

%Trying other types of queues
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
ok.

declare(Ch, Q) ->
declare(Ch, Q, []).
exchange_limit(Config) ->
DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []),
?assert(?EXCHANGE_LIMIT > DefaultXs),

declare(Ch, Q, Args) ->
[Server0, Server1] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),

%% Reach the limit.
[begin
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
#'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>)
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],

%% Trying to declare the next exchange fails.
OverLimitXName = <<"over-limit-x">>,
?assertExit(
{{shutdown, {server_initiated_close, 406,
<<"PRECONDITION_FAILED", _/binary>>}}, _},
declare_exchange(Ch1, OverLimitXName, <<"fanout">>)),

%% Existing exchanges can be re-declared.
ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])),
#'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>),

%% The limit is cluster wide: the other node cannot declare the exchange
%% either.
?assertExit(
{{shutdown, {server_initiated_close, 406,
<<"PRECONDITION_FAILED", _/binary>>}}, _},
declare_exchange(Ch2, OverLimitXName, <<"fanout">>)),

%% Clean up extra exchanges
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0),
[begin
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
#'exchange.delete_ok'{} = amqp_channel:call(
Ch3,
#'exchange.delete'{exchange = XName})
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],

ok.

%% -------------------------------------------------------------------

declare_queue(Ch, Q) ->
declare_queue(Ch, Q, []).

declare_queue(Ch, Q, Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
auto_delete = false,
arguments = Args}).

declare_exchange(Ch, Name, Type) ->
amqp_channel:call(Ch, #'exchange.declare'{exchange = Name,
type = Type,
durable = true}).

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].
Loading