@@ -114,89 +114,87 @@ where
114114 let poll_step_name = step_name. clone ( ) ;
115115 let poll_output_sender = output_sender. clone ( ) ;
116116 let mut polling_task = tokio:: spawn ( async move {
117- let mut last_poll = tokio:: time:: Instant :: now ( ) ;
118117 let poll_duration = poll_step. lock ( ) . await . poll_interval ( ) ;
119118
120119 while poll_step. lock ( ) . await . should_continue_polling ( ) . await {
121120 // It's possible that the channel always has items, so we need to ensure we call `poll` manually if we need to
122- if last_poll. elapsed ( ) >= poll_duration {
123- let polling_duration_for_logging = Instant :: now ( ) ;
124- let result = match poll_step. lock ( ) . await . poll ( ) . await {
125- Ok ( result) => result,
126- Err ( e) => {
127- error ! (
128- step_name = poll_step_name,
129- error = e. to_string( ) ,
130- "Failed to poll"
131- ) ;
132- break ;
133- } ,
134- } ;
135- match StepMetricsBuilder :: default ( )
136- . labels ( StepMetricLabels {
137- step_name : poll_step_name. clone ( ) ,
138- } )
139- . polling_duration_in_secs (
140- polling_duration_for_logging. elapsed ( ) . as_secs_f64 ( ) ,
141- )
142- . build ( )
143- {
144- Ok ( mut metrics) => metrics. log_metrics ( ) ,
145- Err ( e) => {
146- error ! (
147- step_name = poll_step_name,
148- error = e. to_string( ) ,
149- "Failed to log metrics"
150- ) ;
151- break ;
152- } ,
153- }
154- if let Some ( outputs_with_context) = result {
155- for output_with_context in outputs_with_context {
156- match StepMetricsBuilder :: default ( )
157- . labels ( StepMetricLabels {
158- step_name : poll_step_name. clone ( ) ,
159- } )
160- . latest_polled_version ( output_with_context. metadata . end_version )
161- . latest_polled_transaction_timestamp (
162- output_with_context. get_start_transaction_timestamp_unix ( ) ,
163- )
164- . polled_transaction_latency (
165- output_with_context. get_transaction_latency ( ) ,
166- )
167- . num_polled_transactions_count (
168- output_with_context. get_num_transactions ( ) ,
169- )
170- . polled_size_in_bytes (
171- output_with_context. metadata . total_size_in_bytes ,
172- )
173- . build ( )
174- {
175- Ok ( mut metrics) => metrics. log_metrics ( ) ,
176- Err ( e) => {
177- error ! (
178- step_name = poll_step_name,
179- error = e. to_string( ) ,
180- "Failed to log metrics"
181- ) ;
182- break ;
183- } ,
184- }
185- match poll_output_sender. send ( output_with_context) . await {
186- Ok ( _) => { } ,
187- Err ( e) => {
188- error ! (
189- step_name = poll_step_name,
190- error = e. to_string( ) ,
191- "Error sending output to channel"
192- ) ;
193- break ;
194- } ,
195- }
196- }
197- } ;
198- last_poll = tokio:: time:: Instant :: now ( ) ;
121+ let polling_duration_for_logging = Instant :: now ( ) ;
122+ let result = match poll_step. lock ( ) . await . poll ( ) . await {
123+ Ok ( result) => result,
124+ Err ( e) => {
125+ error ! (
126+ step_name = poll_step_name,
127+ error = e. to_string( ) ,
128+ "Failed to poll"
129+ ) ;
130+ break ;
131+ } ,
132+ } ;
133+ match StepMetricsBuilder :: default ( )
134+ . labels ( StepMetricLabels {
135+ step_name : poll_step_name. clone ( ) ,
136+ } )
137+ . polling_duration_in_secs (
138+ polling_duration_for_logging. elapsed ( ) . as_secs_f64 ( ) ,
139+ )
140+ . build ( )
141+ {
142+ Ok ( mut metrics) => metrics. log_metrics ( ) ,
143+ Err ( e) => {
144+ error ! (
145+ step_name = poll_step_name,
146+ error = e. to_string( ) ,
147+ "Failed to log metrics"
148+ ) ;
149+ break ;
150+ } ,
199151 }
152+ if let Some ( outputs_with_context) = result {
153+ for output_with_context in outputs_with_context {
154+ match StepMetricsBuilder :: default ( )
155+ . labels ( StepMetricLabels {
156+ step_name : poll_step_name. clone ( ) ,
157+ } )
158+ . latest_polled_version ( output_with_context. metadata . end_version )
159+ . latest_polled_transaction_timestamp (
160+ output_with_context. get_start_transaction_timestamp_unix ( ) ,
161+ )
162+ . polled_transaction_latency (
163+ output_with_context. get_transaction_latency ( ) ,
164+ )
165+ . num_polled_transactions_count (
166+ output_with_context. get_num_transactions ( ) ,
167+ )
168+ . polled_size_in_bytes (
169+ output_with_context. metadata . total_size_in_bytes ,
170+ )
171+ . build ( )
172+ {
173+ Ok ( mut metrics) => metrics. log_metrics ( ) ,
174+ Err ( e) => {
175+ error ! (
176+ step_name = poll_step_name,
177+ error = e. to_string( ) ,
178+ "Failed to log metrics"
179+ ) ;
180+ break ;
181+ } ,
182+ }
183+ match poll_output_sender. send ( output_with_context) . await {
184+ Ok ( _) => { } ,
185+ Err ( e) => {
186+ error ! (
187+ step_name = poll_step_name,
188+ error = e. to_string( ) ,
189+ "Error sending output to channel"
190+ ) ;
191+ break ;
192+ } ,
193+ }
194+ }
195+ } ;
196+
197+ tokio:: time:: sleep ( poll_duration) . await ;
200198 }
201199 } ) ;
202200
0 commit comments