Skip to content

Commit 033ab45

Browse files
committed
rabbitmq_*_federation: Stop links during plugin stop
[Why] Links are started by the plugins but put under the `rabbit` supervision tree. The federation plugins supervision tree is empty unfortunately... Links are stopped by a boot step executed by `rabbit`, as a concequence of unregistering the plugins' parameters. Unfortunately, links can be terminated if the channel, and implicitly the connection stops. This happens when the `amqp_client` application stops. We end up with a race here: * Because the federation plugins supervision trees are empty and the application stop functions barely stop the pg group (which doesn't terminate the group members), nothing waits for the links to stop. Therefore, `rabbit` can stop `amqp_client' which is a dependency of the federation plugins. Therefore, the links underlying channels and connections are stopped. * `rabbit` unregister the federation parameters, terminating the links. The exchange links `terminate/2` function needs the channel to delete the remote queue. But the channel and the underlying connection might be gone. This simply logs a `badmatch` exception: [error] <0.884.0> Federation link could not create a disposable (one-off) channel due to an error error: {badmatch, [error] <0.884.0> {error, [error] <0.884.0> {noproc, [error] <0.884.0> {gen_server, [error] <0.884.0> call, [error] <0.884.0> [<0.911.0>, [error] <0.884.0> {command, [error] <0.884.0> {open_channel, [error] <0.884.0> none, [error] <0.884.0> {amqp_selective_consumer, [error] <0.884.0> []}}}, [error] <0.884.0> 130000]}}}} [How] The solution is to make sure links are stopped as part of the stop of the plugins. `rabbit_federation_pg:stop_scope/1` is expanded to stop all members of all groups in this scope, before terminating the pg scope itself. The new code waits for the stopped processes to exit. We have to handle the `EXIT` signal in the link processes and change their restart strategy in their parent supervisor from permanent to transient. This ensures they are restarted only if they crash. This also skips a error log message about each stopped link.
1 parent 50e5fc7 commit 033ab45

File tree

4 files changed

+30
-2
lines changed

4 files changed

+30
-2
lines changed

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ handle_info(check_internal_exchange, State = #state{internal_exchange = IntXName
184184
{noreply, State#state{internal_exchange_timer = TRef}}
185185
end;
186186

187+
handle_info({'EXIT', _From, Reason}, State) ->
188+
{stop, Reason, State};
189+
187190
handle_info(Msg, State) ->
188191
{stop, {unexpected_info, Msg}, State}.
189192

deps/rabbitmq_federation_common/src/rabbit_federation_link_sup.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ specs(LinkMod, XorQ) ->
9999

100100
spec(LinkMod, U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
101101
{U, {LinkMod, start_link, [{U, XName}]},
102-
{permanent, Delay}, ?WORKER_WAIT, worker,
102+
{transient, Delay}, ?WORKER_WAIT, worker,
103103
[LinkMod]};
104104

105105
spec(LinkMod, Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) ->
106106
{Upstream, {LinkMod, start_link, [{Upstream, Q}]},
107-
{permanent, Delay}, ?WORKER_WAIT, worker,
107+
{transient, Delay}, ?WORKER_WAIT, worker,
108108
[LinkMod]}.
109109

110110
name(#exchange{name = XName}) -> XName;

deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,29 @@ stop_scope(Scope) ->
1717
case whereis(Scope) of
1818
Pid when is_pid(Pid) ->
1919
rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]),
20+
Groups = pg:which_groups(Scope),
21+
lists:foreach(
22+
fun(Group) ->
23+
stop_group(Scope, Group)
24+
end, Groups),
2025
exit(Pid, normal);
2126
_ ->
2227
ok
2328
end.
29+
30+
stop_group(Scope, Group) ->
31+
Members = pg:get_local_members(Scope, Group),
32+
MRefs = [erlang:monitor(process, Member) || Member <- Members],
33+
lists:foreach(
34+
fun(Member) ->
35+
exit(Member, normal)
36+
end, Members),
37+
lists:foreach(
38+
fun(MRef) ->
39+
receive
40+
{'DOWN', MRef, process, _Member, _Info} ->
41+
logger:alert("Member ~p stopped: ~0p", [_Member, _Info]),
42+
ok
43+
end
44+
end, MRefs),
45+
ok.

deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
162162
QName = amqqueue:get_name(Q),
163163
handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State);
164164

165+
handle_info({'EXIT', _From, Reason}, State) ->
166+
{stop, Reason, State};
167+
165168
handle_info(Msg, State) ->
166169
{stop, {unexpected_info, Msg}, State}.
167170

0 commit comments

Comments
 (0)