diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index f5b79370fcd6..8bf2b29d15b4 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1012,6 +1012,20 @@ end}. {mapping, "max_message_size", "rabbit.max_message_size", [{datatype, integer}, {validators, ["max_message_size"]}]}. +{mapping, "cluster_exchange_limit", "rabbit.cluster_exchange_limit", + [{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}. + +{translation, "rabbit.cluster_exchange_limit", + fun(Conf) -> + case cuttlefish:conf_get("cluster_exchange_limit", Conf, undefined) of + undefined -> cuttlefish:unset(); + infinity -> infinity; + Val when is_integer(Val) -> Val; + _ -> cuttlefish:invalid("should be a non-negative integer") + end + end +}. + %% Customising Socket Options. %% %% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 4d4fd8046480..38f29d0cda7d 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -266,10 +266,11 @@ count_in_mnesia() -> mnesia:table_info(?MNESIA_TABLE, size). count_in_khepri() -> - Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:count(Path) of - {ok, Count} -> Count; - _ -> 0 + try + ets:info(?KHEPRI_PROJECTION, size) + catch + error:badarg -> + 0 end. %% ------------------------------------------------------------------- @@ -869,7 +870,12 @@ exists_in_mnesia(Name) -> ets:member(?MNESIA_TABLE, Name). exists_in_khepri(Name) -> - rabbit_khepri:exists(khepri_exchange_path(Name)). + try + ets:member(?KHEPRI_PROJECTION, Name) + catch + error:badarg -> + false + end. %% ------------------------------------------------------------------- %% clear(). diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 274f22869644..eb7e2b0d9cc3 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -102,9 +102,13 @@ serial(X) -> Internal :: boolean(), Args :: rabbit_framing:amqp_table(), Username :: rabbit_types:username(), - Ret :: {ok, rabbit_types:exchange()} | {error, timeout}. + Ret :: {ok, rabbit_types:exchange()} | + {error, timeout} | + %% May exit with `#amqp_error{}` if validations fail: + rabbit_types:channel_exit(). declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> + ok = check_exchange_limits(XName), X = rabbit_exchange_decorator:set( rabbit_policy:set(#exchange{name = XName, type = Type, @@ -141,6 +145,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> {ok, X} end. +check_exchange_limits(XName) -> + Limit = rabbit_misc:get_env(rabbit, cluster_exchange_limit, infinity), + case rabbit_db_exchange:count() >= Limit of + false -> + ok; + true -> + case rabbit_db_exchange:exists(XName) of + true -> + %% Allow re-declares of existing exchanges when at the + %% exchange limit. + ok; + false -> + rabbit_misc:protocol_error( + precondition_failed, + "cannot declare ~ts: exchange limit of ~tp is reached", + [rabbit_misc:rs(XName), Limit]) + end + end. + %% Used with binaries sent over the wire; the type may not exist. -spec check_type diff --git a/deps/rabbit/test/cluster_limit_SUITE.erl b/deps/rabbit/test/cluster_limit_SUITE.erl index 004030381d3b..fdec644e79f6 100644 --- a/deps/rabbit/test/cluster_limit_SUITE.erl +++ b/deps/rabbit/test/cluster_limit_SUITE.erl @@ -12,6 +12,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -compile([nowarn_export_all, export_all]). +-define(EXCHANGE_LIMIT, 10). all() -> [ @@ -22,7 +23,8 @@ groups() -> [ {clustered, [], [ - {size_2, [], [queue_limit]} + {size_2, [], [queue_limit, + exchange_limit]} ]} ]. @@ -34,7 +36,8 @@ init_per_suite(Config0) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{quorum_tick_interval, 1000}, - {cluster_queue_limit, 3}]}), + {cluster_queue_limit, 3}, + {cluster_exchange_limit, ?EXCHANGE_LIMIT}]}), rabbit_ct_helpers:run_setup_steps(Config1, []). end_per_suite(Config) -> @@ -101,48 +104,99 @@ queue_limit(Config) -> Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1), Q1 = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q1, 0, 0}, - declare(Ch, Q1)), + declare_queue(Ch, Q1)), Q2 = ?config(alt_queue_name, Config), ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Ch, Q2)), + declare_queue(Ch, Q2)), Q3 = ?config(alt_2_queue_name, Config), ?assertEqual({'queue.declare_ok', Q3, 0, 0}, - declare(Ch, Q3)), + declare_queue(Ch, Q3)), Q4 = ?config(over_limit_queue_name, Config), ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])), ?assertExit( {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, - declare(Ch, Q4)), + declare_queue(Ch, Q4)), %% Trying the second server, in the cluster, but no queues on it, %% but should still fail as the limit is cluster wide. ?assertExit( {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, - declare(Ch2, Q4)), + declare_queue(Ch2, Q4)), %Trying other types of queues ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0), ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1), ?assertExit( {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, - declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?assertExit( {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, - declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), ok. -declare(Ch, Q) -> - declare(Ch, Q, []). +exchange_limit(Config) -> + DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []), + ?assert(?EXCHANGE_LIMIT > DefaultXs), -declare(Ch, Q, Args) -> + [Server0, Server1] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1), + + %% Reach the limit. + [begin + XName = list_to_binary(rabbit_misc:format("x-~b", [N])), + #'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>) + end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)], + + %% Trying to declare the next exchange fails. + OverLimitXName = <<"over-limit-x">>, + ?assertExit( + {{shutdown, {server_initiated_close, 406, + <<"PRECONDITION_FAILED", _/binary>>}}, _}, + declare_exchange(Ch1, OverLimitXName, <<"fanout">>)), + + %% Existing exchanges can be re-declared. + ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])), + #'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>), + + %% The limit is cluster wide: the other node cannot declare the exchange + %% either. + ?assertExit( + {{shutdown, {server_initiated_close, 406, + <<"PRECONDITION_FAILED", _/binary>>}}, _}, + declare_exchange(Ch2, OverLimitXName, <<"fanout">>)), + + %% Clean up extra exchanges + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0), + [begin + XName = list_to_binary(rabbit_misc:format("x-~b", [N])), + #'exchange.delete_ok'{} = amqp_channel:call( + Ch3, + #'exchange.delete'{exchange = XName}) + end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)], + + ok. + +%% ------------------------------------------------------------------- + +declare_queue(Ch, Q) -> + declare_queue(Ch, Q, []). + +declare_queue(Ch, Q, Args) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true, auto_delete = false, arguments = Args}). +declare_exchange(Ch, Name, Type) -> + amqp_channel:call(Ch, #'exchange.declare'{exchange = Name, + type = Type, + durable = true}). + delete_queues() -> [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()].