From f8ce467b916fca10980ea63b6454baad74b1cd7c Mon Sep 17 00:00:00 2001 From: iequidoo Date: Tue, 22 Jul 2025 11:41:52 -0300 Subject: [PATCH] fix: Prefetch messages in limited batches (#6915) I have logs from a user where messages are prefetched for long minutes, and while it's not a problem on its own, we can't rely that the connection overlives such a period, so make `fetch_new_messages()` prefetch (and then actually download) messages in batches of 500 messages. --- src/imap.rs | 44 ++++++++++++++++++++++++++++++++++++++------ src/imap/session.rs | 19 ++++++------------- 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/src/imap.rs b/src/imap.rs index 81f62721e4..a0b0fc4e27 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -555,10 +555,38 @@ impl Imap { } session.new_mail = false; + let mut read_cnt = 0; + loop { + let (n, fetch_more) = self + .fetch_new_msg_batch(context, session, folder, folder_meaning) + .await?; + read_cnt += n; + if !fetch_more { + return Ok(read_cnt > 0); + } + } + } + + /// Returns number of messages processed and whether the function should be called again. + async fn fetch_new_msg_batch( + &mut self, + context: &Context, + session: &mut Session, + folder: &str, + folder_meaning: FolderMeaning, + ) -> Result<(usize, bool)> { let uid_validity = get_uidvalidity(context, folder).await?; let old_uid_next = get_uid_next(context, folder).await?; + info!( + context, + "fetch_new_messages({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}." + ); - let msgs = session.prefetch(old_uid_next).await.context("prefetch")?; + let uids_to_prefetch = 500; + let msgs = session + .prefetch(old_uid_next, uids_to_prefetch) + .await + .context("prefetch")?; let read_cnt = msgs.len(); let download_limit = context.download_limit().await?; @@ -718,7 +746,8 @@ impl Imap { largest_uid_fetched }; - let actually_download_messages_future = async move { + let actually_download_messages_future = async { + let sender = sender; let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1)); let mut fetch_partially = false; uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1)); @@ -753,14 +782,17 @@ impl Imap { // if the message has arrived after selecting mailbox // and determining its UIDNEXT and before prefetch. let mut new_uid_next = largest_uid_fetched + 1; - if fetch_res.is_ok() { + let fetch_more = fetch_res.is_ok() && { + let prefetch_uid_next = old_uid_next + uids_to_prefetch; // If we have successfully fetched all messages we planned during prefetch, // then we have covered at least the range between old UIDNEXT // and UIDNEXT of the mailbox at the time of selecting it. - new_uid_next = max(new_uid_next, mailbox_uid_next); + new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next)); new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1); - } + + prefetch_uid_next < mailbox_uid_next + }; if new_uid_next > old_uid_next { set_uid_next(context, folder, new_uid_next).await?; } @@ -777,7 +809,7 @@ impl Imap { // establish a new session if this one is broken. fetch_res?; - Ok(read_cnt > 0) + Ok((read_cnt, fetch_more)) } /// Read the recipients from old emails sent by the user and add them as contacts. diff --git a/src/imap/session.rs b/src/imap/session.rs index eb73b4a35b..8588fbab8c 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -110,14 +110,16 @@ impl Session { Ok(list) } - /// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results - /// in the order of ascending delivery time to the server (INTERNALDATE). + /// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the + /// order of ascending delivery time to the server (INTERNALDATE). pub(crate) async fn prefetch( &mut self, uid_next: u32, + n_uids: u32, ) -> Result> { + let uid_last = uid_next.saturating_add(n_uids - 1); // fetch messages with larger UID than the last one seen - let set = format!("{uid_next}:*"); + let set = format!("{uid_next}:{uid_last}"); let mut list = self .uid_fetch(set, PREFETCH_FLAGS) .await @@ -126,16 +128,7 @@ impl Session { let mut msgs = BTreeMap::new(); while let Some(msg) = list.try_next().await? { if let Some(msg_uid) = msg.uid { - // If the mailbox is not empty, results always include - // at least one UID, even if last_seen_uid+1 is past - // the last UID in the mailbox. It happens because - // uid:* is interpreted the same way as *:uid. - // See for - // standard reference. Therefore, sometimes we receive - // already seen messages and have to filter them out. - if msg_uid >= uid_next { - msgs.insert((msg.internal_date(), msg_uid), msg); - } + msgs.insert((msg.internal_date(), msg_uid), msg); } }