Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ lookup_dlx(#state{exchange_ref = DLXRef} = State0) ->
State = log_missing_dlx_once(State0),
{not_found, State};
{ok, X} ->
{X, State0}
State = clear_log_missing_dlx_once(State0),
{X, State}
end.

-spec forward(mc:state(), non_neg_integer(), rabbit_amqqueue:name(),
Expand Down Expand Up @@ -344,7 +345,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
[] ->
log_no_route_once(State1);
_ ->
State1
clear_log_no_route_once(State1)
end,
{RouteToQs, State2}
end,
Expand Down Expand Up @@ -499,8 +500,9 @@ redeliver0(#pending{delivery = Msg0,
%% Routes changed dynamically so that we don't await any publisher confirms anymore.
%% Since we also received at least one publisher confirm (mandatory flag semantics),
%% we can ack the message to the source quorum queue.
State0#state{pendings = maps:remove(OutSeq, Pendings),
settled_ids = [ConsumedId | SettledIds]};
State = State0#state{pendings = maps:remove(OutSeq, Pendings),
settled_ids = [ConsumedId | SettledIds]},
clear_log_no_route_once(State);
_ ->
%% Do not redeliver message to a target queue
%% 1. for which we already received a publisher confirm, or
Expand All @@ -513,7 +515,7 @@ redeliver0(#pending{delivery = Msg0,
State1 = log_cycles(Cycles, DLRKeys, State0),
case RouteToQs of
[] ->
State1;
log_no_route_once(State1);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you delete your DLX and then try to forward a dead-letter message, you'll get the warning about the DLX not existing. If you then create the DLX but don't yet have a binding that makes the route to the DLQ viable, we previously weren't logging the no-route warning. I think the old code was assuming that, because we're redelivering in this function, we would've already warned about no-route.

_ ->
Pend = Pend0#pending{publish_count = PublishCount + 1,
last_published_at = os:system_time(millisecond),
Expand All @@ -523,7 +525,7 @@ redeliver0(#pending{delivery = Msg0,
%% Any target queue that rejected previously and still need
%% to be routed to is moved back to 'unsettled'.
rejected = []},
State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)},
State = clear_log_no_route_once(State0#state{pendings = maps:update(OutSeq, Pend, Pendings)}),
Options = #{correlation => OutSeq},
deliver_to_queues(Msg,
Options,
Expand Down Expand Up @@ -633,6 +635,19 @@ log_missing_dlx_once(#state{exchange_ref = DlxResource,
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource)]),
State#state{logged = maps:put(missing_dlx, DlxResource, Logged)}.

clear_log_missing_dlx_once(#state{exchange_ref = DlxResource,
queue_ref = QueueResource,
pendings = Pendings,
logged = #{missing_dlx := MissingDlx} = Logged} = State) ->
?LOG_INFO("Dead-letter-exchange ~ts found for quorum ~ts. Forwarding was previously "
"blocked since the configured dead-letter-exchange ~ts could not be found. "
"Forwarding of ~b pending dead-letter messages will be attempted.",
[rabbit_misc:rs(DlxResource), rabbit_misc:rs(QueueResource),
rabbit_misc:rs(MissingDlx), maps:size(Pendings)]),
State#state{logged = maps:remove(missing_dlx, Logged)};
clear_log_missing_dlx_once(State) ->
State.

log_no_route_once(#state{exchange_ref = SameDlx,
routing_key = SameRoutingKey,
logged = #{no_route := {SameDlx, SameRoutingKey}}} = State) ->
Expand All @@ -653,6 +668,22 @@ log_no_route_once(#state{queue_ref = QueueResource,
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource), RoutingKey]),
State#state{logged = maps:put(no_route, {DlxResource, RoutingKey}, Logged)}.

clear_log_no_route_once(#state{exchange_ref = DlxResource,
routing_key = RoutingKey,
queue_ref = QueueResource,
pendings = Pendings,
logged = #{no_route := {OldDlx, OldRoutingKey}} = Logged} = State) ->
?LOG_INFO("Discovered a route to forward dead-letter messages from quorum ~ts on "
"configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts'. "
"Previously dead-letter messages could not be forwarded on configured "
"dead-letter-exchange ~ts and dead-letter-routing-key '~ts'. "
"Forwarding of ~b pending dead-letter messages will be attempted.",
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource),
RoutingKey, rabbit_misc:rs(OldDlx), OldRoutingKey, maps:size(Pendings)]),
State#state{logged = maps:remove(no_route, Logged)};
clear_log_no_route_once(State) ->
State.

log_cycles(Cycles, RoutingKeys, State) ->
lists:foldl(fun(Cycle, S) -> log_cycle_once(Cycle, RoutingKeys, S) end, State, Cycles).

Expand Down
Loading