From 7448ef7ad712800e02496e05f8ac032dfa82746e Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Sun, 13 Mar 2022 10:36:05 +0000 Subject: [PATCH 1/2] Parallelize `predict` subcommand --- crates/cli/predict.rs | 47 ++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/crates/cli/predict.rs b/crates/cli/predict.rs index 7ed782d0..858828c7 100644 --- a/crates/cli/predict.rs +++ b/crates/cli/predict.rs @@ -2,6 +2,7 @@ use crate::PredictArgs; use anyhow::Result; use either::Either; use itertools::Itertools; +use rayon::prelude::*; use tangram_core::predict::{PredictInput, PredictInputValue, PredictOptions}; use tangram_zip::zip; @@ -69,24 +70,34 @@ pub fn predict(args: PredictArgs) -> Result<()> { } }; let header = reader.headers()?.to_owned(); - for records in &reader.records().chunks(PREDICT_CHUNK_SIZE) { - let input: Vec = records - .into_iter() - .map(|record| -> Result { - let record = record?; - let input = zip!(header.iter(), record.into_iter()) - .map(|(column_name, value)| { - ( - column_name.to_owned(), - PredictInputValue::String(value.to_owned()), - ) - }) - .collect(); - Ok(PredictInput(input)) - }) - .collect::>()?; - let output = tangram_core::predict::predict(&model, &input, &options); - for output in output { + let mut record_chunks: Vec> = Vec::new(); + for record_chunk in &reader.records().chunks(PREDICT_CHUNK_SIZE) { + let record_chunk = record_chunk.collect(); + record_chunks.push(record_chunk); + } + let output_chunks: Vec> = record_chunks + .into_par_iter() + .map(|records| { + let input: Result, _> = records + .into_iter() + .map(|record| -> Result { + let record = record?; + let input = zip!(header.iter(), record.into_iter()) + .map(|(column_name, value)| { + ( + column_name.to_owned(), + PredictInputValue::String(value.to_owned()), + ) + }) + .collect(); + Ok(PredictInput(input)) + }) + .collect(); + input.map(|input| tangram_core::predict::predict(&model, &input, &options)) + }) + .collect(); + for outputs in output_chunks { + for output in outputs? { let output = match output { tangram_core::predict::PredictOutput::Regression(output) => { vec![output.value.to_string()] From 39b60bba487dd2af6b75fe519e1e28ef6f00538a Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Sun, 13 Mar 2022 16:28:52 +0000 Subject: [PATCH 2/2] Do not buffer the whole CSV file when running `predict` --- Cargo.lock | 2 + crates/cli/Cargo.toml | 2 + crates/cli/predict.rs | 118 ++++++++++++++++++++++++++++++++---------- 3 files changed, 94 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 214d9de9..464623f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4472,6 +4472,7 @@ dependencies = [ "bytes", "clap 3.1.3", "colored", + "crossbeam-channel", "csv", "dirs", "either", @@ -4480,6 +4481,7 @@ dependencies = [ "itertools", "libc", "num", + "num_cpus", "once_cell", "pretty_assertions", "rayon", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 290e9174..81b3e209 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -22,12 +22,14 @@ backtrace = "0.3" bytes = { version = "1", optional = true } clap = { version = "3", features = ["derive"] } colored = "2" +crossbeam-channel = "0.5" csv = "1" dirs = "4" either = "1" hyper = { version = "0.14", optional = true } itertools = "0.10" num = "0.4" +num_cpus = "1" once_cell = "1" rayon = "1.5" serde = { version = "1", features = ["derive"] } diff --git a/crates/cli/predict.rs b/crates/cli/predict.rs index 858828c7..9e3ad61a 100644 --- a/crates/cli/predict.rs +++ b/crates/cli/predict.rs @@ -1,8 +1,11 @@ use crate::PredictArgs; use anyhow::Result; +use crossbeam_channel::Sender; +use csv::StringRecord; use either::Either; use itertools::Itertools; -use rayon::prelude::*; +use std::sync::Arc; +use tangram_core::predict::PredictOutput; use tangram_core::predict::{PredictInput, PredictInputValue, PredictOptions}; use tangram_zip::zip; @@ -69,35 +72,89 @@ pub fn predict(args: PredictArgs) -> Result<()> { } } }; + let header = reader.headers()?.to_owned(); - let mut record_chunks: Vec> = Vec::new(); - for record_chunk in &reader.records().chunks(PREDICT_CHUNK_SIZE) { - let record_chunk = record_chunk.collect(); - record_chunks.push(record_chunk); + let chunk_count = num_cpus::get() * 2; + let (input_tx, input_rx): ( + Sender<( + Vec, + Sender, anyhow::Error>>, + )>, + _, + ) = crossbeam_channel::bounded(chunk_count); + let (output_tx, output_rx) = crossbeam_channel::bounded(chunk_count); + + let header = Arc::new(header); + let model = Arc::new(model); + let options = Arc::new(options); + + let mut threads = Vec::new(); + + for _ in 0..num_cpus::get() { + let header = header.clone(); + let model = model.clone(); + let options = options.clone(); + let input_rx = input_rx.clone(); + + threads.push(std::thread::spawn(move || { + while let Ok((records, chunk_tx)) = input_rx.recv() { + let input: Result, _> = records + .into_iter() + .map(|record| -> Result { + let input = zip!(header.iter(), record.into_iter()) + .map(|(column_name, value)| { + ( + column_name.to_owned(), + PredictInputValue::String(value.to_owned()), + ) + }) + .collect(); + Ok(PredictInput(input)) + }) + .collect(); + + let output = + input.map(|input| tangram_core::predict::predict(&model, &input, &options)); + + if chunk_tx.send(output).is_err() { + break; + }; + } + })); } - let output_chunks: Vec> = record_chunks - .into_par_iter() - .map(|records| { - let input: Result, _> = records - .into_iter() - .map(|record| -> Result { - let record = record?; - let input = zip!(header.iter(), record.into_iter()) - .map(|(column_name, value)| { - ( - column_name.to_owned(), - PredictInputValue::String(value.to_owned()), - ) - }) - .collect(); - Ok(PredictInput(input)) - }) - .collect(); - input.map(|input| tangram_core::predict::predict(&model, &input, &options)) - }) - .collect(); - for outputs in output_chunks { - for output in outputs? { + + threads.push(std::thread::spawn(move || { + for records_chunk in &reader.records().chunks(PREDICT_CHUNK_SIZE) { + let records_chunk: Result, _> = records_chunk.collect(); + let records_chunk = match records_chunk { + Ok(records_chunk) => records_chunk, + Err(error) => { + let error: anyhow::Error = error.into(); + let _ = output_tx.send(Err(error)); + break; + } + }; + + // Here we create a single use channel which will allow the CSV writer + // to wait for the prediction results in-order while still allowing + // the prediction for future chunks to run in parallel. + let (chunk_tx, chunk_rx) = crossbeam_channel::bounded(1); + if let Err(error) = input_tx.send((records_chunk, chunk_tx)) { + let error: anyhow::Error = error.into(); + let _ = output_tx.send(Err(error)); + break; + } + if output_tx.send(Ok(chunk_rx)).is_err() { + break; + } + } + })); + + while let Ok(output) = output_rx.recv() { + let chunk_rx = output?; + let outputs = chunk_rx.recv()??; + + for output in outputs { let output = match output { tangram_core::predict::PredictOutput::Regression(output) => { vec![output.value.to_string()] @@ -140,5 +197,10 @@ pub fn predict(args: PredictArgs) -> Result<()> { writer.write_record(&output)?; } } + + for thread in threads { + thread.join().unwrap(); + } + Ok(()) }