Skip to content

By @dcorbacho and me: introduce [cluster-]local shovels, adopt message containers for shovels #14256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f1a816c
Shovel: use message containers
dcorbacho Jun 13, 2025
a96c957
amqp10_msg: type spec & code refactor
dcorbacho Jun 20, 2025
aea1650
AMQP10 shovel: make bare message inmutable
dcorbacho Jul 4, 2025
aaaf8c3
Local shovels
dcorbacho Jun 23, 2025
d49ce79
WIP
dcorbacho Jul 16, 2025
868a361
Local shovel: use queue name to filter source/dest messages
dcorbacho Jul 17, 2025
acbc20a
Local shovel: Initialise delivery count for credit
dcorbacho Jul 17, 2025
fceb461
Local shovel: fix initial delivery count and state handling
dcorbacho Jul 18, 2025
00152e7
Local shovels: set link credit
dcorbacho Jul 18, 2025
a4383c0
Local shovels: renew credit
dcorbacho Jul 20, 2025
a68b825
Local shovel: finish credit flow handling
dcorbacho Jul 20, 2025
ff05a90
Local shovels: remove rabbit_log and switch to LOG_ macros
dcorbacho Jul 22, 2025
47510a6
Local shovels: remove unused parameter
dcorbacho Jul 22, 2025
e00d83a
Shovel tests: ignore nodename
dcorbacho Jul 22, 2025
165716f
Local shovels: grant credit after confirming
dcorbacho Jul 23, 2025
422a2ae
local_dynamic_SUITE: await credit for publishing links
michaelklishin Jul 23, 2025
bcbfc6f
Local shovel: handle destination queue events
dcorbacho Jul 24, 2025
c883bd7
Local shovels: remove unused prefetch count
dcorbacho Jul 24, 2025
3aaec2b
Local shovel: fix credit handling order
dcorbacho Jul 28, 2025
6bbadb7
local_dynamic_SUITE: ignore two expected crash reports in the logs
michaelklishin Jul 28, 2025
d88a541
Local shovel: more tests
dcorbacho Jul 29, 2025
831d527
Local shovels: handle credit on sender side
dcorbacho Jul 29, 2025
6912439
Shovel management: add local shovels
dcorbacho Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,29 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
%% A disposition will be notified to the sender by a message of the
%% following stucture:
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()], boolean()) -> amqp10_msg().
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
Body = [#'v1_0.data'{content = Bin}],
new(DeliveryTag, Body, Settled);
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{
transfer = #'v1_0.transfer'{
delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
%% This lib is safe by default.
header = #'v1_0.header'{durable = true},
body = Body}.
Transfer = #'v1_0.transfer'{
delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
case is_amqp10_body(Body) orelse (not is_list(Body)) of
true ->
#amqp10_msg{
transfer = Transfer,
%% This lib is safe by default.
header = #'v1_0.header'{durable = true},
body = Body};
false ->
from_amqp_records([Transfer | Body])
end.

%% @doc Create a new settled amqp10 message using the specified delivery tag
%% and body.
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()]) -> amqp10_msg().
new(DeliveryTag, Body) ->
new(DeliveryTag, Body, false).

Expand Down Expand Up @@ -462,3 +468,19 @@ uint(B) -> {uint, B}.

has_value(undefined) -> false;
has_value(_) -> true.

is_amqp10_body(#'v1_0.amqp_value'{}) ->
true;
is_amqp10_body(List) when is_list(List) ->
lists:all(fun(#'v1_0.data'{}) ->
true;
(_) ->
false
end, List) orelse
lists:all(fun(#'v1_0.amqp_sequence'{}) ->
true;
(_) ->
false
end, List);
is_amqp10_body(_) ->
false.
1 change: 0 additions & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2445,7 +2445,6 @@ incoming_link_transfer(
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
messages_received(Settled),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKeys, Mc1, PermCache} ->
Expand Down
129 changes: 23 additions & 106 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

-module(rabbit_amqp091_shovel).

-define(APP, rabbitmq_shovel).

-behaviour(rabbit_shovel_behaviour).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_shovel.hrl").
-include_lib("kernel/include/logger.hrl").

Expand All @@ -34,7 +33,7 @@
ack/3,
nack/3,
status/1,
forward/4
forward/3
]).

%% Function references should not be stored on the metadata store.
Expand All @@ -57,7 +56,7 @@ parse(_Name, {source, Source}) ->
CArgs = proplists:get_value(consumer_args, Source, []),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
resource_decl => decl_fun({source, Source}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {source, Source}),
queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
prefetch_count => Prefetch,
Expand All @@ -73,7 +72,7 @@ parse(Name, {destination, Dest}) ->
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
#{module => ?MODULE,
uris => proplists:get_value(uris, Dest),
resource_decl => decl_fun({destination, Dest}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {destination, Dest}),
props_fun => PropsFun2,
fields_fun => PubFieldsFun,
add_forward_headers => AFH,
Expand Down Expand Up @@ -170,8 +169,8 @@ forward_pending(State) ->
case pop_pending(State) of
empty ->
State;
{{Tag, Props, Payload}, S} ->
S2 = do_forward(Tag, Props, Payload, S),
{{Tag, Mc}, S} ->
S2 = do_forward(Tag, Mc, S),
S3 = control_throttle(S2),
case is_blocked(S3) of
true ->
Expand All @@ -184,91 +183,50 @@ forward_pending(State) ->
end
end.

forward(IncomingTag, Props, Payload, State) ->
forward(IncomingTag, Mc, State) ->
case is_blocked(State) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
PendingEntry = {IncomingTag, Mc},
add_pending(PendingEntry, State);
false ->
State1 = do_forward(IncomingTag, Props, Payload, State),
State1 = do_forward(IncomingTag, Mc, State),
control_throttle(State1)
end.

do_forward(IncomingTag, Props, Payload,
do_forward(IncomingTag, Mc0,
State0 = #{dest := #{props_fun := {M, F, Args},
current := {_, _, DstUri},
fields_fun := {Mf, Ff, Argsf}}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
Exchange = maps:get(exchange, Props, undefined),
RoutingKey = maps:get(routing_key, Props, undefined),
Exchange = mc:exchange(Mc0),
RoutingKey = case mc:routing_keys(Mc0) of
[RK | _] -> RK;
Any -> Any
end,
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
Mc = mc:convert(mc_amqpl, Mc0),
{Props, Payload} = rabbit_basic_common:from_content(mc:protocol_state(Mc)),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
payload = Payload},
publish(IncomingTag, Method1, Msg1, State0).

props_from_map(Map) ->
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
content_encoding = maps:get(content_encoding, Map, undefined),
headers = maps:get(headers, Map, undefined),
delivery_mode = maps:get(delivery_mode, Map, undefined),
priority = maps:get(priority, Map, undefined),
correlation_id = maps:get(correlation_id, Map, undefined),
reply_to = maps:get(reply_to, Map, undefined),
expiration = maps:get(expiration, Map, undefined),
message_id = maps:get(message_id, Map, undefined),
timestamp = maps:get(timestamp, Map, undefined),
type = maps:get(type, Map, undefined),
user_id = maps:get(user_id, Map, undefined),
app_id = maps:get(app_id, Map, undefined),
cluster_id = maps:get(cluster_id, Map, undefined)}.

map_from_props(#'P_basic'{content_type = Content_type,
content_encoding = Content_encoding,
headers = Headers,
delivery_mode = Delivery_mode,
priority = Priority,
correlation_id = Correlation_id,
reply_to = Reply_to,
expiration = Expiration,
message_id = Message_id,
timestamp = Timestamp,
type = Type,
user_id = User_id,
app_id = App_id,
cluster_id = Cluster_id}) ->
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
({K, V}, Acc) -> Acc#{K => V}
end, #{}, [{content_type, Content_type},
{content_encoding, Content_encoding},
{headers, Headers},
{delivery_mode, Delivery_mode},
{priority, Priority},
{correlation_id, Correlation_id},
{reply_to, Reply_to},
{expiration, Expiration},
{message_id, Message_id},
{timestamp, Timestamp},
{type, Type},
{user_id, User_id},
{app_id, App_id},
{cluster_id, Cluster_id}
]).

handle_source(#'basic.consume_ok'{}, State) ->
State;
handle_source({#'basic.deliver'{delivery_tag = Tag,
exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{props = Props0, payload = Payload}}, State) ->
Props = (map_from_props(Props0))#{exchange => Exchange,
routing_key => RoutingKey},
Content = rabbit_basic_common:build_content(Props0, Payload),
Msg0 = mc:init(mc_amqpl, Content, #{}),
Msg1 = mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg0),
Msg = mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg1),
% forward to destination
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
rabbit_shovel_behaviour:forward(Tag, Msg, State);

handle_source({'EXIT', Conn, Reason},
#{source := #{current := {Conn, _, _}}}) ->
Expand Down Expand Up @@ -584,47 +542,6 @@ props_fun_timestamp_header({M, F, Args}, SrcUri, DestUri, Props) ->
rabbit_shovel_util:add_timestamp_header(
apply(M, F, Args ++ [SrcUri, DestUri, Props])).

parse_declaration({[], Acc}) ->
Acc;
parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
catch exit:Reason -> fail(Reason)
end,
case proplists:get_keys(Props) -- FieldNames of
[] -> ok;
UnknownFields -> fail({unknown_fields, Method, UnknownFields})
end,
{Res, _Idx} = lists:foldl(
fun (K, {R, Idx}) ->
NewR = case proplists:get_value(K, Props) of
undefined -> R;
V -> setelement(Idx, R, V)
end,
{NewR, Idx + 1}
end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
FieldNames),
parse_declaration({Rest, [Res | Acc]});
parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
fail({expected_method_field_list, Method, Props});
parse_declaration({[Method | Rest], Acc}) ->
parse_declaration({[{Method, []} | Rest], Acc}).

decl_fun({source, Endpoint}) ->
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
[] ->
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
true -> case proplists:get_value(queue, Endpoint) of
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
Queue -> {?MODULE, check_fun, [Queue]}
end;
false -> {?MODULE, decl_fun, []}
end;
Decl -> {?MODULE, decl_fun, [Decl]}
end;
decl_fun({destination, Endpoint}) ->
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
{?MODULE, decl_fun, [Decl]}.

decl_fun(Decl, _Conn, Ch) ->
[begin
amqp_channel:call(Ch, M)
Expand Down
Loading
Loading