@@ -302,14 +302,14 @@ handle_source(_Msg, _State) ->
302
302
303
303
handle_dest ({queue_event , _QRef , {confirm , MsgSeqNos , _QPid }},
304
304
#{ack_mode := on_confirm } = State ) ->
305
- confirm_to_inbound (fun (Tag , Multi , StateX ) ->
306
- rabbit_shovel_behaviour :ack (Tag , Multi , StateX )
307
- end , MsgSeqNos , false , State );
305
+ confirm_to_inbound (fun (Tag , StateX ) ->
306
+ rabbit_shovel_behaviour :ack (Tag , false , StateX )
307
+ end , MsgSeqNos , State );
308
308
handle_dest ({queue_event , _QRef , {reject_publish , Seq , _QPid }},
309
309
#{ack_mode := on_confirm } = State ) ->
310
- confirm_to_inbound (fun (Tag , Multi , StateX ) ->
311
- rabbit_shovel_behaviour :nack (Tag , Multi , StateX )
312
- end , Seq , false , State );
310
+ confirm_to_inbound (fun (Tag , StateX ) ->
311
+ rabbit_shovel_behaviour :nack (Tag , false , StateX )
312
+ end , Seq , State );
313
313
handle_dest ({{'DOWN' , # resource {name = Queue ,
314
314
kind = queue ,
315
315
virtual_host = VHost }}, _ , _ , _ , _ } ,
@@ -626,32 +626,18 @@ route(Msg, #{current := #{vhost := VHost}}) ->
626
626
Exchange = rabbit_exchange :lookup_or_die (ExchangeName ),
627
627
rabbit_exchange :route (Exchange , Msg , #{return_binding_keys => true }).
628
628
629
- remove_delivery_tags (Seq , false , Unacked , 0 ) ->
630
- {maps :remove (Seq , Unacked ), 1 };
631
- remove_delivery_tags (Seq , true , Unacked , Count ) ->
632
- case maps :size (Unacked ) of
633
- 0 -> {Unacked , Count };
634
- _ ->
635
- maps :fold (fun (K , _V , {Acc , Cnt }) when K =< Seq ->
636
- {maps :remove (K , Acc ), Cnt + 1 };
637
- (_K , _V , Acc ) -> Acc
638
- end , {Unacked , 0 }, Unacked )
639
- end .
640
-
641
-
642
- confirm_to_inbound (ConfirmFun , SeqNos , Multiple , State )
629
+ confirm_to_inbound (ConfirmFun , SeqNos , State )
643
630
when is_list (SeqNos ) ->
644
631
lists :foldl (fun (Seq , State0 ) ->
645
- confirm_to_inbound (ConfirmFun , Seq , Multiple , State0 )
632
+ confirm_to_inbound (ConfirmFun , Seq , State0 )
646
633
end , State , SeqNos );
647
- confirm_to_inbound (ConfirmFun , Seq , Multiple ,
634
+ confirm_to_inbound (ConfirmFun , Seq ,
648
635
State0 = #{dest := #{unacked := Unacked } = Dst }) ->
649
636
#{Seq := InTag } = Unacked ,
650
- State = ConfirmFun (InTag , Multiple , State0 ),
651
- {Unacked1 , Removed } = remove_delivery_tags (Seq , Multiple , Unacked , 0 ),
652
- rabbit_shovel_behaviour :decr_remaining (Removed ,
653
- State #{dest =>
654
- Dst #{unacked => Unacked1 }}).
637
+ State = ConfirmFun (InTag , State0 ),
638
+ Unacked1 = maps :remove (Seq , Unacked ),
639
+ rabbit_shovel_behaviour :decr_remaining (
640
+ 1 , State #{dest => Dst #{unacked => Unacked1 }}).
655
641
656
642
sent_delivery (#{source := #{current := #{consumer_tag := CTag ,
657
643
vhost := VHost ,
0 commit comments