Skip to content

Commit fccc22c

Browse files
committed
better threading handling
1 parent c8c1c4d commit fccc22c

File tree

2 files changed

+24
-35
lines changed

2 files changed

+24
-35
lines changed

src/files.rs

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::{Result, anyhow};
22
#[cfg(not(test))]
33
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
4+
use rayon::prelude::*;
45
use rusqlite::Connection;
56
use std::{
67
error::Error,
@@ -133,7 +134,7 @@ pub fn index_files_recursively(
133134
Ok(())
134135
}
135136

136-
pub fn reencode_files(conn: Connection, handler: Arc<AtomicBool>, threads: usize) -> Result<()> {
137+
pub fn reencode_files(conn: Connection, handler: Arc<AtomicBool>) -> Result<()> {
137138
#[cfg(not(test))]
138139
let bar = ProgressBar::with_draw_target(
139140
Some(conn.get_toencode_number()?),
@@ -144,41 +145,25 @@ pub fn reencode_files(conn: Connection, handler: Arc<AtomicBool>, threads: usize
144145

145146
let files = conn.get_toencode_files()?;
146147

147-
let pool = rayon::ThreadPoolBuilder::new()
148-
.num_threads(threads)
149-
.build()?;
150-
151148
let lock = Arc::new(Mutex::new(conn));
152149

153-
pool.scope(|scope| {
154-
for file in files {
155-
if handler.load(Ordering::SeqCst) {
156-
scope.spawn(|_| {
157-
let newconn = if let Ok(conn) = lock.lock() {
158-
conn
159-
} else {
160-
eprintln!("Error setting up lock on file:\t{}", file.to_string_lossy());
161-
return;
162-
};
163-
if !file.exists() {
164-
let _ = newconn.remove_file(&file);
165-
#[cfg(not(test))]
166-
bar.dec_length(1);
167-
return;
168-
}
169-
170-
if let Err(error) = handle_encode(&file) {
171-
eprintln!("{}", FileError::new(&file, error));
172-
} else {
173-
if let Err(error) = newconn.update_file(&file) {
174-
eprintln!("{}", FileError::new(file, error));
175-
}
176-
#[cfg(not(test))]
177-
bar.inc(1)
178-
}
179-
});
150+
files.par_iter().for_each(|file| {
151+
if handler.load(Ordering::SeqCst) {
152+
let conn = match lock.lock() {
153+
Ok(conn) => conn,
154+
Err(_) => {
155+
eprintln!("Lock poisoned on file:\t{}", file.to_string_lossy());
156+
return;
157+
}
158+
};
159+
if let Err(error) = handle_encode(file) {
160+
eprintln!("{}", FileError::new(file, error));
180161
} else {
181-
break;
162+
if let Err(error) = conn.update_file(file) {
163+
eprintln!("{}", FileError::new(file, error));
164+
}
165+
#[cfg(not(test))]
166+
bar.inc(1)
182167
}
183168
}
184169
});
@@ -258,7 +243,7 @@ mod tests {
258243
let temp = handler.clone();
259244
index_files_recursively(Path::new("./testfiles"), &conn, temp).unwrap();
260245
println!("\n{}", conn.get_toencode_number().unwrap());
261-
reencode_files(conn, handler, 2).unwrap();
246+
reencode_files(conn, handler).unwrap();
262247
let conn = Connection::new(Some(&dbname)).unwrap();
263248
println!("\n{}", conn.get_toencode_number().unwrap());
264249
std::fs::remove_file(dbname).unwrap();

src/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ fn main() -> Result<()> {
116116
if args.get_flag("doit") {
117117
let hanlder = running.clone();
118118
let threads = *args.get_one::<usize>("threads").unwrap();
119-
files::reencode_files(conn, hanlder, threads)?;
119+
let pool = rayon::ThreadPoolBuilder::new()
120+
.num_threads(threads)
121+
.build()?;
122+
123+
pool.install(|| files::reencode_files(conn, hanlder))?;
120124
}
121125
Ok::<(), anyhow::Error>(())
122126
}

0 commit comments

Comments
 (0)