Skip to content

Various refactors to the codegen coordinator code (part 3) #144503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions compiler/rustc_codegen_ssa/src/back/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::any::Any;
use std::assert_matches::assert_matches;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -372,8 +371,6 @@ pub struct CodegenContext<B: WriteBackendMethods> {
/// The incremental compilation session directory, or None if we are not
/// compiling incrementally
pub incr_comp_session_dir: Option<PathBuf>,
/// Channel back to the main control thread to send messages to
pub coordinator_send: Sender<Box<dyn Any + Send>>,
/// `true` if the codegen should be run in parallel.
///
/// Depends on [`ExtraBackendMethods::supports_parallel()`] and `-Zno_parallel_backend`.
Expand Down Expand Up @@ -1122,10 +1119,10 @@ fn start_executing_work<B: ExtraBackendMethods>(
autodiff_items: &[AutoDiffItem],
shared_emitter: SharedEmitter,
codegen_worker_send: Sender<CguMessage>,
coordinator_receive: Receiver<Box<dyn Any + Send>>,
coordinator_receive: Receiver<Message<B>>,
regular_config: Arc<ModuleConfig>,
allocator_config: Arc<ModuleConfig>,
tx_to_llvm_workers: Sender<Box<dyn Any + Send>>,
tx_to_llvm_workers: Sender<Message<B>>,
) -> thread::JoinHandle<Result<CompiledModules, ()>> {
let coordinator_send = tx_to_llvm_workers;
let sess = tcx.sess;
Expand Down Expand Up @@ -1153,7 +1150,7 @@ fn start_executing_work<B: ExtraBackendMethods>(
let coordinator_send2 = coordinator_send.clone();
let helper = jobserver::client()
.into_helper_thread(move |token| {
drop(coordinator_send2.send(Box::new(Message::Token::<B>(token))));
drop(coordinator_send2.send(Message::Token::<B>(token)));
})
.expect("failed to spawn helper thread");

Expand Down Expand Up @@ -1187,7 +1184,6 @@ fn start_executing_work<B: ExtraBackendMethods>(
remark: sess.opts.cg.remark.clone(),
remark_dir,
incr_comp_session_dir: sess.incr_comp_session_dir_opt().map(|r| r.clone()),
coordinator_send,
expanded_args: tcx.sess.expanded_args.clone(),
diag_emitter: shared_emitter.clone(),
output_filenames: Arc::clone(tcx.output_filenames(())),
Expand Down Expand Up @@ -1423,7 +1419,7 @@ fn start_executing_work<B: ExtraBackendMethods>(
let (item, _) =
work_items.pop().expect("queue empty - queue_full_enough() broken?");
main_thread_state = MainThreadState::Lending;
spawn_work(&cgcx, &mut llvm_start_time, item);
spawn_work(&cgcx, coordinator_send.clone(), &mut llvm_start_time, item);
}
}
} else if codegen_state == Completed {
Expand Down Expand Up @@ -1502,7 +1498,7 @@ fn start_executing_work<B: ExtraBackendMethods>(
MainThreadState::Idle => {
if let Some((item, _)) = work_items.pop() {
main_thread_state = MainThreadState::Lending;
spawn_work(&cgcx, &mut llvm_start_time, item);
spawn_work(&cgcx, coordinator_send.clone(), &mut llvm_start_time, item);
} else {
// There is no unstarted work, so let the main thread
// take over for a running worker. Otherwise the
Expand Down Expand Up @@ -1538,16 +1534,15 @@ fn start_executing_work<B: ExtraBackendMethods>(
while running_with_own_token < tokens.len()
&& let Some((item, _)) = work_items.pop()
{
spawn_work(&cgcx, &mut llvm_start_time, item);
spawn_work(&cgcx, coordinator_send.clone(), &mut llvm_start_time, item);
running_with_own_token += 1;
}
}

// Relinquish accidentally acquired extra tokens.
tokens.truncate(running_with_own_token);

let msg = coordinator_receive.recv().unwrap();
match *msg.downcast::<Message<B>>().ok().unwrap() {
match coordinator_receive.recv().unwrap() {
// Save the token locally and the next turn of the loop will use
// this to spawn a new unit of work, or it may get dropped
// immediately if we have no more work to spawn.
Expand Down Expand Up @@ -1769,6 +1764,7 @@ pub(crate) struct WorkerFatalError;

fn spawn_work<'a, B: ExtraBackendMethods>(
cgcx: &'a CodegenContext<B>,
coordinator_send: Sender<Message<B>>,
llvm_start_time: &mut Option<VerboseTimingGuard<'a>>,
work: WorkItem<B>,
) {
Expand All @@ -1782,7 +1778,7 @@ fn spawn_work<'a, B: ExtraBackendMethods>(
// Set up a destructor which will fire off a message that we're done as
// we exit.
struct Bomb<B: ExtraBackendMethods> {
coordinator_send: Sender<Box<dyn Any + Send>>,
coordinator_send: Sender<Message<B>>,
result: Option<Result<WorkItemResult<B>, FatalError>>,
}
impl<B: ExtraBackendMethods> Drop for Bomb<B> {
Expand All @@ -1794,11 +1790,11 @@ fn spawn_work<'a, B: ExtraBackendMethods>(
}
None => Message::WorkItem::<B> { result: Err(None) },
};
drop(self.coordinator_send.send(Box::new(msg)));
drop(self.coordinator_send.send(msg));
}
}

let mut bomb = Bomb::<B> { coordinator_send: cgcx.coordinator_send.clone(), result: None };
let mut bomb = Bomb::<B> { coordinator_send, result: None };

// Execute the work itself, and if it finishes successfully then flag
// ourselves as a success as well.
Expand Down Expand Up @@ -2003,7 +1999,7 @@ impl SharedEmitterMain {
}

pub struct Coordinator<B: ExtraBackendMethods> {
pub sender: Sender<Box<dyn Any + Send>>,
sender: Sender<Message<B>>,
future: Option<thread::JoinHandle<Result<CompiledModules, ()>>>,
// Only used for the Message type.
phantom: PhantomData<B>,
Expand All @@ -2020,7 +2016,7 @@ impl<B: ExtraBackendMethods> Drop for Coordinator<B> {
if let Some(future) = self.future.take() {
// If we haven't joined yet, signal to the coordinator that it should spawn no more
// work, and wait for worker threads to finish.
drop(self.sender.send(Box::new(Message::CodegenAborted::<B>)));
drop(self.sender.send(Message::CodegenAborted::<B>));
drop(future.join());
}
}
Expand Down Expand Up @@ -2079,7 +2075,7 @@ impl<B: ExtraBackendMethods> OngoingCodegen<B> {
pub(crate) fn codegen_finished(&self, tcx: TyCtxt<'_>) {
self.wait_for_signal_to_codegen_item();
self.check_for_errors(tcx.sess);
drop(self.coordinator.sender.send(Box::new(Message::CodegenComplete::<B>)));
drop(self.coordinator.sender.send(Message::CodegenComplete::<B>));
}

pub(crate) fn check_for_errors(&self, sess: &Session) {
Expand All @@ -2100,28 +2096,25 @@ impl<B: ExtraBackendMethods> OngoingCodegen<B> {
}

pub(crate) fn submit_codegened_module_to_llvm<B: ExtraBackendMethods>(
_backend: &B,
tx_to_llvm_workers: &Sender<Box<dyn Any + Send>>,
coordinator: &Coordinator<B>,
module: ModuleCodegen<B::Module>,
cost: u64,
) {
let llvm_work_item = WorkItem::Optimize(module);
drop(tx_to_llvm_workers.send(Box::new(Message::CodegenDone::<B> { llvm_work_item, cost })));
drop(coordinator.sender.send(Message::CodegenDone::<B> { llvm_work_item, cost }));
}

pub(crate) fn submit_post_lto_module_to_llvm<B: ExtraBackendMethods>(
_backend: &B,
tx_to_llvm_workers: &Sender<Box<dyn Any + Send>>,
coordinator: &Coordinator<B>,
module: CachedModuleCodegen,
) {
let llvm_work_item = WorkItem::CopyPostLtoArtifacts(module);
drop(tx_to_llvm_workers.send(Box::new(Message::CodegenDone::<B> { llvm_work_item, cost: 0 })));
drop(coordinator.sender.send(Message::CodegenDone::<B> { llvm_work_item, cost: 0 }));
}

pub(crate) fn submit_pre_lto_module_to_llvm<B: ExtraBackendMethods>(
_backend: &B,
tcx: TyCtxt<'_>,
tx_to_llvm_workers: &Sender<Box<dyn Any + Send>>,
coordinator: &Coordinator<B>,
module: CachedModuleCodegen,
) {
let filename = pre_lto_bitcode_filename(&module.name);
Expand All @@ -2135,10 +2128,10 @@ pub(crate) fn submit_pre_lto_module_to_llvm<B: ExtraBackendMethods>(
})
};
// Schedule the module to be loaded
drop(tx_to_llvm_workers.send(Box::new(Message::AddImportOnlyModule::<B> {
drop(coordinator.sender.send(Message::AddImportOnlyModule::<B> {
module_data: SerializedModule::FromUncompressedFile(mmap),
work_product: module.source,
})));
}));
}

fn pre_lto_bitcode_filename(module_name: &str) -> String {
Expand Down
16 changes: 4 additions & 12 deletions compiler/rustc_codegen_ssa/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
// These modules are generally cheap and won't throw off scheduling.
let cost = 0;
submit_codegened_module_to_llvm(
&backend,
&ongoing_codegen.coordinator.sender,
&ongoing_codegen.coordinator,
ModuleCodegen::new_allocator(llmod_id, module_llvm),
cost,
);
Expand Down Expand Up @@ -800,18 +799,12 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
// compilation hang on post-monomorphization errors.
tcx.dcx().abort_if_errors();

submit_codegened_module_to_llvm(
&backend,
&ongoing_codegen.coordinator.sender,
module,
cost,
);
submit_codegened_module_to_llvm(&ongoing_codegen.coordinator, module, cost);
}
CguReuse::PreLto => {
submit_pre_lto_module_to_llvm(
&backend,
tcx,
&ongoing_codegen.coordinator.sender,
&ongoing_codegen.coordinator,
CachedModuleCodegen {
name: cgu.name().to_string(),
source: cgu.previous_work_product(tcx),
Expand All @@ -820,8 +813,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
}
CguReuse::PostLto => {
submit_post_lto_module_to_llvm(
&backend,
&ongoing_codegen.coordinator.sender,
&ongoing_codegen.coordinator,
CachedModuleCodegen {
name: cgu.name().to_string(),
source: cgu.previous_work_product(tcx),
Expand Down