1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: sync:: Arc ;
16+
1517use databend_common_exception:: Result ;
1618use databend_common_expression:: types:: Int32Type ;
1719use databend_common_expression:: DataBlock ;
1820use databend_common_expression:: FromData ;
1921use databend_common_pipeline_core:: processors:: connect;
22+ use databend_common_pipeline_core:: processors:: BlockLimit ;
2023use databend_common_pipeline_core:: processors:: DuplicateProcessor ;
2124use databend_common_pipeline_core:: processors:: Event ;
2225use databend_common_pipeline_core:: processors:: InputPort ;
@@ -40,9 +43,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
4043 let downstream_input2 = InputPort :: create ( ) ;
4144
4245 unsafe {
43- connect ( & input, & upstream_output) ;
44- connect ( & downstream_input1, & output1) ;
45- connect ( & downstream_input2, & output2) ;
46+ connect ( & input, & upstream_output, Arc :: new ( BlockLimit :: default ( ) ) ) ;
47+ connect ( & downstream_input1, & output1, Arc :: new ( BlockLimit :: default ( ) ) ) ;
48+ connect ( & downstream_input2, & output2, Arc :: new ( BlockLimit :: default ( ) ) ) ;
4649 }
4750
4851 downstream_input1. set_need_data ( ) ;
@@ -68,9 +71,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
6871 let downstream_input2 = InputPort :: create ( ) ;
6972
7073 unsafe {
71- connect ( & input, & upstream_output) ;
72- connect ( & downstream_input1, & output1) ;
73- connect ( & downstream_input2, & output2) ;
74+ connect ( & input, & upstream_output, Arc :: new ( BlockLimit :: default ( ) ) ) ;
75+ connect ( & downstream_input1, & output1, Arc :: new ( BlockLimit :: default ( ) ) ) ;
76+ connect ( & downstream_input2, & output2, Arc :: new ( BlockLimit :: default ( ) ) ) ;
7477 }
7578
7679 downstream_input1. finish ( ) ;
@@ -94,9 +97,9 @@ async fn test_duplicate_output_finish() -> Result<()> {
9497 let downstream_input2 = InputPort :: create ( ) ;
9598
9699 unsafe {
97- connect ( & input, & upstream_output) ;
98- connect ( & downstream_input1, & output1) ;
99- connect ( & downstream_input2, & output2) ;
100+ connect ( & input, & upstream_output, Arc :: new ( BlockLimit :: default ( ) ) ) ;
101+ connect ( & downstream_input1, & output1, Arc :: new ( BlockLimit :: default ( ) ) ) ;
102+ connect ( & downstream_input2, & output2, Arc :: new ( BlockLimit :: default ( ) ) ) ;
100103 }
101104
102105 downstream_input1. finish ( ) ;
@@ -120,9 +123,9 @@ async fn test_duplicate_processor() -> Result<()> {
120123 let downstream_input2 = InputPort :: create ( ) ;
121124
122125 unsafe {
123- connect ( & input, & upstream_output) ;
124- connect ( & downstream_input1, & output1) ;
125- connect ( & downstream_input2, & output2) ;
126+ connect ( & input, & upstream_output, Arc :: new ( BlockLimit :: default ( ) ) ) ;
127+ connect ( & downstream_input1, & output1, Arc :: new ( BlockLimit :: default ( ) ) ) ;
128+ connect ( & downstream_input2, & output2, Arc :: new ( BlockLimit :: default ( ) ) ) ;
126129 }
127130
128131 downstream_input1. set_need_data ( ) ;
0 commit comments