Skip to main content

revmc_runtime/runtime/
worker.rs

1//! Background JIT and AOT compilation workers.
2//!
3//! Each worker thread owns a long-lived `EvmCompiler` instance tied to its
4//! thread-local LLVM context.
5//!
6//! With ORCv2, JIT code lifetime is managed per-module via `ResourceTracker`s.
7//! Each successful JIT compilation extracts the committed module's tracker and
8//! returns it as a [`JitCodeBacking`], which frees the machine code on drop.
9//! Workers no longer need to stay alive for code lifetime — they exit as soon
10//! as the job channel closes.
11
12use crate::{
13    CompileTimings, EvmCompilerFn, OptimizationLevel,
14    runtime::{
15        config::{CompilationKind, RuntimeConfig},
16        stats::RuntimeStats,
17        storage::RuntimeCacheKey,
18    },
19};
20use alloy_primitives::Bytes;
21use crossbeam_channel as chan;
22use rayon::{ThreadPool, ThreadPoolBuilder};
23#[cfg(feature = "llvm")]
24use std::{cell::RefCell, fs::File, io::Read, time::Instant};
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33#[cfg(feature = "llvm")]
34use crate::{
35    EvmCompiler, EvmLlvmBackend, Linker,
36    llvm::{JitDylibGuard, orc::ResourceTracker},
37    runtime::config::JitMode,
38};
39
40#[cfg(all(feature = "llvm", unix))]
41type OutOfProcessHelper = Option<Arc<super::out_of_process::HelperProcess>>;
42#[cfg(not(all(feature = "llvm", unix)))]
43type OutOfProcessHelper = ();
44
45/// Notifier for synchronous compilation requests.
46///
47/// Wraps an optional sender that is notified when the compilation
48/// completes (success or failure). Passed from the backend through
49/// the worker and back, then fired after the result is processed.
50pub(crate) struct SyncNotifier(Option<chan::Sender<()>>);
51
52impl SyncNotifier {
53    pub(crate) fn none() -> Self {
54        Self(None)
55    }
56
57    pub(crate) fn new(tx: chan::Sender<()>) -> Self {
58        Self(Some(tx))
59    }
60
61    pub(crate) fn notify(self) {
62        if let Some(tx) = self.0 {
63            let _ = tx.send(());
64        }
65    }
66}
67
68/// A compilation job sent from the backend to a worker.
69#[derive(derive_more::Debug)]
70pub(crate) struct CompileJob {
71    /// Whether this job compiles JIT or AOT output.
72    pub(crate) kind: CompilationKind,
73    /// The key to compile for.
74    pub(crate) key: RuntimeCacheKey,
75    /// The raw bytecode to compile.
76    pub(crate) bytecode: Bytes,
77    /// The symbol name to use for the compiled function.
78    pub(crate) symbol_name: String,
79    /// Optimization level for compilation.
80    pub(crate) opt_level: OptimizationLevel,
81    /// Optional notifier for synchronous callers.
82    #[debug(skip)]
83    pub(crate) sync_notifier: SyncNotifier,
84    /// Generation at the time the job was dispatched.
85    pub(crate) generation: u64,
86}
87
88/// Result of a compilation attempt, sent back from a worker to the backend.
89pub(crate) struct WorkerResult {
90    /// The key that was compiled.
91    pub(crate) key: RuntimeCacheKey,
92    /// The compilation outcome.
93    pub(crate) outcome: Result<WorkerSuccess, String>,
94    /// Whether this was a JIT or AOT compilation job.
95    pub(crate) kind: CompilationKind,
96    /// Optional notifier for synchronous callers, passed through from the job.
97    pub(crate) sync_notifier: SyncNotifier,
98    /// Generation at the time the job was dispatched.
99    pub(crate) generation: u64,
100    /// Wall-clock time spent compiling.
101    pub(crate) compile_duration: Duration,
102    /// Per-phase timing breakdown from the compiler.
103    pub(crate) timings: CompileTimings,
104}
105
106/// Successful compilation output.
107pub(crate) enum WorkerSuccess {
108    /// JIT compilation produced an in-memory function pointer.
109    Jit(JitSuccess),
110    /// AOT compilation produced shared-library bytes.
111    Aot(AotSuccess),
112    /// JIT compilation produced relocatable object bytes to link in the parent.
113    JitObject(JitObjectSuccess),
114}
115
116/// Successful JIT compilation output.
117pub(crate) struct JitSuccess {
118    /// The compiled function pointer.
119    pub(crate) func: EvmCompilerFn,
120    /// Owns the JIT machine code via an ORCv2 `ResourceTracker`.
121    /// Dropping this frees the compiled code.
122    pub(crate) backing: Arc<JitCodeBacking>,
123}
124
125/// Successful AOT compilation output.
126pub(crate) struct JitObjectSuccess {
127    /// The symbol name in the object file.
128    pub(crate) symbol_name: String,
129    /// The raw relocatable object bytes.
130    pub(crate) object_bytes: Bytes,
131    /// Builtin absolute symbols referenced by the object.
132    pub(crate) builtin_symbols: Vec<String>,
133}
134
135pub(crate) struct AotSuccess {
136    /// The symbol name in the shared library.
137    pub(crate) symbol_name: String,
138    /// The raw shared-library bytes (.so / .dylib).
139    pub(crate) dylib_bytes: Vec<u8>,
140    /// Length of the original bytecode.
141    pub(crate) bytecode_len: usize,
142}
143
144/// Owns JIT-compiled machine code via an ORCv2 `ResourceTracker` and a
145/// [`JitDylibGuard`].
146///
147/// The tracker provides per-entry code removal: dropping it calls
148/// `tracker.remove()` which frees this entry's machine code.
149/// The guard keeps the owning `JITDylib` alive — it won't be cleared
150/// or recycled until all guards are dropped.
151///
152/// This enables true per-entry eviction: removing a `CompiledProgram`
153/// from the resident map reclaims its machine code once all callers
154/// release their `Arc<CompiledProgram>` handles.
155pub(crate) struct JitCodeBacking {
156    /// The tracker that owns this entry's machine code.
157    /// Must be dropped (after `remove()`) BEFORE `_jd_guard` to ensure
158    /// the JITDylib is still valid when we remove resources from it.
159    #[cfg(feature = "llvm")]
160    tracker: Option<ResourceTracker>,
161    /// Keeps the owning JITDylib alive. Dropped after `tracker`.
162    #[cfg(feature = "llvm")]
163    _jd_guard: Arc<JitDylibGuard>,
164}
165
166impl JitCodeBacking {
167    #[cfg(feature = "llvm")]
168    pub(crate) fn new(tracker: ResourceTracker, jd_guard: Arc<JitDylibGuard>) -> Self {
169        Self { tracker: Some(tracker), _jd_guard: jd_guard }
170    }
171}
172
173#[cfg(feature = "llvm")]
174impl Drop for JitCodeBacking {
175    fn drop(&mut self) {
176        if let Some(tracker) = self.tracker.take()
177            && let Err(e) = tracker.remove()
178        {
179            warn!("failed to remove JIT code: {e}");
180        }
181    }
182}
183
184/// Handle to the worker pool.
185pub(crate) struct WorkerPool {
186    /// Rayon pool used to execute compilation jobs.
187    pool: Option<ThreadPool>,
188    /// Out-of-process helper used by this worker pool.
189    out_of_process_helper: OutOfProcessHelper,
190    /// Sender for worker results.
191    result_tx: chan::Sender<WorkerResult>,
192    /// Runtime configuration shared by spawned jobs.
193    config: Arc<RuntimeConfig>,
194    /// Number of queued jobs waiting to start.
195    queued: Arc<AtomicUsize>,
196    /// Maximum queued jobs accepted by the pool.
197    queue_capacity: usize,
198    /// Signals workers to stop accepting new work.
199    shutdown: Arc<AtomicBool>,
200}
201
202impl WorkerPool {
203    /// Creates and starts the worker pool.
204    pub(crate) fn new(
205        result_tx: chan::Sender<WorkerResult>,
206        config: RuntimeConfig,
207        stats: Arc<RuntimeStats>,
208    ) -> Self {
209        let worker_count = config.tuning.jit_worker_count;
210        let queue_capacity = worker_count.saturating_mul(config.tuning.jit_worker_queue_capacity);
211        let out_of_process_helper = create_out_of_process_helper(&config, stats);
212        let pool = (worker_count > 0).then(|| {
213            ThreadPoolBuilder::new()
214                .num_threads(worker_count)
215                .thread_name(|i| format!("revmc-{i:02}"))
216                .exit_handler(|_| clear_thread_local_compilers())
217                .build()
218                .expect("failed to spawn compile workers")
219        });
220
221        Self {
222            pool,
223            out_of_process_helper,
224            result_tx,
225            config: Arc::new(config),
226            queued: Arc::new(AtomicUsize::new(0)),
227            queue_capacity,
228            shutdown: Arc::new(AtomicBool::new(false)),
229        }
230    }
231
232    /// Tries to send a job to the worker pool.
233    /// Returns the job back on failure (queue full or shut down).
234    pub(crate) fn try_send(&mut self, job: CompileJob) -> Result<(), CompileJob> {
235        if self.pool.is_none() || self.shutdown.load(Ordering::Acquire) {
236            return Err(job);
237        }
238
239        let mut current = self.queued.load(Ordering::Acquire);
240        loop {
241            if current >= self.queue_capacity {
242                return Err(job);
243            }
244            match self.queued.compare_exchange_weak(
245                current,
246                current + 1,
247                Ordering::AcqRel,
248                Ordering::Acquire,
249            ) {
250                Ok(_) => break,
251                Err(next) => current = next,
252            }
253        }
254
255        let pool = self.pool.as_ref().unwrap();
256        let queued = self.queued.clone();
257        let result_tx = self.result_tx.clone();
258        let config = self.config.clone();
259        let out_of_process_helper = self.out_of_process_helper.clone();
260        pool.spawn_fifo(move || {
261            queued.fetch_sub(1, Ordering::AcqRel);
262            let result = compile_job(job, &config, out_of_process_helper);
263            let _ = result_tx.send(result);
264        });
265        Ok(())
266    }
267
268    /// Shuts down all workers after draining queued jobs.
269    pub(crate) fn shutdown(&mut self) {
270        self.shutdown.store(true, Ordering::Release);
271        self.cancel_in_flight();
272        if let Some(pool) = &self.pool {
273            pool.broadcast(|_| clear_thread_local_compilers());
274        }
275        self.pool.take();
276    }
277
278    /// Cancels any in-flight compilation that can be interrupted externally.
279    pub(crate) fn cancel_in_flight(&self) {
280        cancel_out_of_process_helper(&self.out_of_process_helper);
281    }
282
283    /// Pauses out-of-process helper execution.
284    pub(crate) fn pause(&self) {
285        pause_out_of_process_helper(&self.out_of_process_helper);
286    }
287
288    /// Resumes out-of-process helper execution.
289    pub(crate) fn resume(&self) {
290        resume_out_of_process_helper(&self.out_of_process_helper);
291    }
292}
293
294impl Drop for WorkerPool {
295    fn drop(&mut self) {
296        self.shutdown();
297    }
298}
299
300#[cfg(all(feature = "llvm", unix))]
301fn create_out_of_process_helper(
302    config: &RuntimeConfig,
303    stats: Arc<RuntimeStats>,
304) -> OutOfProcessHelper {
305    (config.jit_mode == JitMode::OutOfProcess)
306        .then(|| Arc::new(super::out_of_process::HelperProcess::new(stats)))
307}
308
309#[cfg(not(all(feature = "llvm", unix)))]
310fn create_out_of_process_helper(
311    _config: &RuntimeConfig,
312    _stats: Arc<RuntimeStats>,
313) -> OutOfProcessHelper {
314}
315
316#[cfg(all(feature = "llvm", unix))]
317fn cancel_out_of_process_helper(helper: &OutOfProcessHelper) {
318    if let Some(helper) = helper {
319        helper.cancel_in_flight();
320    }
321}
322
323#[cfg(not(all(feature = "llvm", unix)))]
324fn cancel_out_of_process_helper(_helper: &OutOfProcessHelper) {}
325
326#[cfg(all(feature = "llvm", unix))]
327fn pause_out_of_process_helper(helper: &OutOfProcessHelper) {
328    if let Some(helper) = helper {
329        helper.pause();
330    }
331}
332
333#[cfg(not(all(feature = "llvm", unix)))]
334fn pause_out_of_process_helper(_helper: &OutOfProcessHelper) {}
335
336#[cfg(all(feature = "llvm", unix))]
337fn resume_out_of_process_helper(helper: &OutOfProcessHelper) {
338    if let Some(helper) = helper {
339        helper.resume();
340    }
341}
342
343#[cfg(not(all(feature = "llvm", unix)))]
344fn resume_out_of_process_helper(_helper: &OutOfProcessHelper) {}
345
346#[cfg(feature = "llvm")]
347fn clear_thread_local_compilers() {
348    JIT_COMPILER.with_borrow_mut(Option::take);
349    AOT_COMPILER.with_borrow_mut(Option::take);
350}
351
352#[cfg(not(feature = "llvm"))]
353fn clear_thread_local_compilers() {}
354
355#[cfg(feature = "llvm")]
356fn compile_job(
357    job: CompileJob,
358    config: &RuntimeConfig,
359    out_of_process_helper: OutOfProcessHelper,
360) -> WorkerResult {
361    trace!(?job, "received job");
362    match job.kind {
363        CompilationKind::Jit if config.jit_mode == JitMode::OutOfProcess => {
364            compile_out_of_process_job(job, config, out_of_process_helper)
365        }
366        CompilationKind::Jit => JIT_COMPILER
367            .with_borrow_mut(|state| compile_with_state(job, config, CompilerTarget::Jit, state)),
368        CompilationKind::Aot => AOT_COMPILER
369            .with_borrow_mut(|state| compile_with_state(job, config, CompilerTarget::Aot, state)),
370    }
371}
372
373#[cfg(all(feature = "llvm", unix))]
374fn compile_out_of_process_job(
375    job: CompileJob,
376    config: &RuntimeConfig,
377    helper: OutOfProcessHelper,
378) -> WorkerResult {
379    let helper = helper.expect("missing out-of-process JIT helper");
380    super::out_of_process::compile_job(job, config, &helper)
381}
382
383#[cfg(all(feature = "llvm", not(unix)))]
384fn compile_out_of_process_job(
385    job: CompileJob,
386    _config: &RuntimeConfig,
387    _helper: OutOfProcessHelper,
388) -> WorkerResult {
389    WorkerResult {
390        key: job.key,
391        outcome: Err("out-of-process JIT is only available on Unix".into()),
392        kind: job.kind,
393        sync_notifier: job.sync_notifier,
394        generation: job.generation,
395        compile_duration: Duration::ZERO,
396        timings: CompileTimings::default(),
397    }
398}
399
400#[cfg(feature = "llvm")]
401#[derive(Clone, Copy, Debug, PartialEq, Eq)]
402pub(super) enum CompilerTarget {
403    Jit,
404    JitObject,
405    Aot,
406}
407
408#[cfg(feature = "llvm")]
409impl CompilerTarget {
410    const fn is_aot_backend(self) -> bool {
411        matches!(self, Self::JitObject | Self::Aot)
412    }
413}
414
415#[cfg(feature = "llvm")]
416pub(super) fn compile_with_state(
417    job: CompileJob,
418    config: &RuntimeConfig,
419    target: CompilerTarget,
420    state_slot: &mut Option<CompilerState>,
421) -> WorkerResult {
422    let _span = match job.kind {
423        CompilationKind::Jit => {
424            debug_span!("jit_compile", hash=%job.key.code_hash, spec_id=?job.key.spec_id).entered()
425        }
426        CompilationKind::Aot => {
427            debug_span!("aot_compile", hash=%job.key.code_hash, spec_id=?job.key.spec_id).entered()
428        }
429    };
430    let t0 = Instant::now();
431
432    if state_slot.is_none() {
433        match CompilerState::new(config, target) {
434            Ok(s) => *state_slot = Some(s),
435            Err(e) => {
436                error!(error = %e, "failed to create LLVM backend");
437                return WorkerResult {
438                    key: job.key,
439                    outcome: Err(e),
440                    kind: job.kind,
441                    sync_notifier: job.sync_notifier,
442                    generation: job.generation,
443                    compile_duration: t0.elapsed(),
444                    timings: CompileTimings::default(),
445                };
446            }
447        }
448    }
449
450    let state = state_slot.as_mut().unwrap();
451    let compiler = &mut state.compiler;
452
453    if job.kind == CompilationKind::Jit
454        && let Some(base) = &config.dump_dir
455    {
456        let dir =
457            base.join(format!("{:?}", job.key.spec_id)).join(format!("{}", job.key.code_hash));
458        compiler.set_dump_to(Some(dir));
459    }
460    compiler.set_opt_level(job.opt_level);
461
462    let outcome = match target {
463        CompilerTarget::Jit => compile_jit_artifact(&job, compiler),
464        CompilerTarget::JitObject => compile_jit_object_artifact(&job, compiler),
465        CompilerTarget::Aot => compile_aot_artifact(&job, compiler),
466    };
467    let timings = compiler.take_timings();
468
469    if let Err(err) = compiler.clear_ir() {
470        warn!(%err, "clear_ir failed");
471    }
472
473    state.compilations_since_recycle += 1;
474    if config.tuning.compiler_recycle_threshold > 0
475        && state.compilations_since_recycle >= config.tuning.compiler_recycle_threshold
476    {
477        debug!(compilations_since_recycle = state.compilations_since_recycle, "recycling compiler");
478        match CompilerState::new(config, target) {
479            Ok(new_state) => {
480                *state_slot = Some(new_state);
481                revmc_llvm::global_gc();
482            }
483            Err(e) => {
484                error!(error = %e, "failed to recreate compiler");
485                state.compilations_since_recycle = 0;
486            }
487        }
488    }
489
490    WorkerResult {
491        key: job.key,
492        outcome,
493        kind: job.kind,
494        sync_notifier: job.sync_notifier,
495        generation: job.generation,
496        compile_duration: t0.elapsed(),
497        timings,
498    }
499}
500
501#[cfg(feature = "llvm")]
502pub(super) struct CompilerState {
503    compiler: EvmCompiler<EvmLlvmBackend>,
504    compilations_since_recycle: usize,
505}
506
507#[cfg(feature = "llvm")]
508impl CompilerState {
509    fn new(config: &RuntimeConfig, target: CompilerTarget) -> Result<Self, String> {
510        Ok(Self {
511            compiler: create_compiler(config, target.is_aot_backend())?,
512            compilations_since_recycle: 0,
513        })
514    }
515}
516
517#[cfg(feature = "llvm")]
518thread_local! {
519    static JIT_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
520    static AOT_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
521}
522
523#[cfg(feature = "llvm")]
524pub(super) fn create_compiler(
525    config: &RuntimeConfig,
526    aot: bool,
527) -> Result<EvmCompiler<EvmLlvmBackend>, String> {
528    let backend = EvmLlvmBackend::new(aot).map_err(|e| e.to_string())?;
529    let mut compiler = EvmCompiler::new(backend);
530    compiler.set_opt_level(if aot {
531        config.tuning.aot_opt_level
532    } else {
533        config.tuning.jit_opt_level
534    });
535    compiler.debug_assertions(config.debug_assertions);
536    compiler.single_error(config.single_error);
537    compiler.set_dedup(!config.no_dedup);
538    compiler.set_dse(!config.no_dse);
539    if let Some(gas_params) = &config.gas_params {
540        compiler.set_gas_params(gas_params.clone());
541    }
542    Ok(compiler)
543}
544
545#[cfg(feature = "llvm")]
546fn compile_jit_artifact(
547    job: &CompileJob,
548    compiler: &mut EvmCompiler<EvmLlvmBackend>,
549) -> Result<WorkerSuccess, String> {
550    let result = unsafe { compiler.jit(&job.symbol_name, &job.bytecode[..], job.key.spec_id) };
551    match result {
552        Ok(func) => {
553            let jd_guard = compiler.backend_mut().jit_dylib_guard();
554            debug!("JIT compilation succeeded");
555            let tracker = compiler
556                .backend_mut()
557                .take_last_resource_tracker()
558                .expect("no ResourceTracker after JIT");
559            let backing = Arc::new(JitCodeBacking::new(tracker, jd_guard));
560            Ok(WorkerSuccess::Jit(JitSuccess { func, backing }))
561        }
562        Err(err) => {
563            warn!(%err, "JIT compilation failed");
564            Err(format!("{err}"))
565        }
566    }
567}
568
569/// Compiles a single bytecode to a shared library and returns the raw bytes.
570#[cfg(feature = "llvm")]
571pub(super) fn compile_jit_object_artifact(
572    job: &CompileJob,
573    compiler: &mut EvmCompiler<EvmLlvmBackend>,
574) -> Result<WorkerSuccess, String> {
575    compiler
576        .translate(&job.symbol_name, &job.bytecode[..], job.key.spec_id)
577        .map_err(|e| format!("JIT object translate failed: {e}"))?;
578
579    let mut object_bytes = Vec::new();
580    compiler
581        .write_object(&mut object_bytes)
582        .map_err(|e| format!("JIT object write failed: {e}"))?;
583    let mut builtin_symbols: Vec<String> = compiler
584        .backend()
585        .pending_symbol_names()
586        .into_iter()
587        .map(|name| name.to_string_lossy().into_owned())
588        .collect();
589    if builtin_symbols.is_empty() {
590        builtin_symbols =
591            revmc_builtins::Builtin::ALL.iter().map(|builtin| builtin.name().to_string()).collect();
592    }
593
594    debug!(
595        bytecode_len = job.bytecode.len(),
596        object_len = object_bytes.len(),
597        "JIT object compilation succeeded",
598    );
599
600    Ok(WorkerSuccess::JitObject(JitObjectSuccess {
601        symbol_name: job.symbol_name.clone(),
602        object_bytes: Bytes::from(object_bytes),
603        builtin_symbols,
604    }))
605}
606
607#[cfg(feature = "llvm")]
608fn compile_aot_artifact(
609    job: &CompileJob,
610    compiler: &mut EvmCompiler<EvmLlvmBackend>,
611) -> Result<WorkerSuccess, String> {
612    compiler
613        .translate(&job.symbol_name, &job.bytecode[..], job.key.spec_id)
614        .map_err(|e| format!("AOT translate failed: {e}"))?;
615
616    let tmp_dir = tempfile::tempdir().map_err(|e| format!("failed to create temp dir: {e}"))?;
617
618    let obj_path = tmp_dir.path().join("a.o");
619    let so_path = tmp_dir.path().join("a.so");
620
621    compiler
622        .write_object_to_file(&obj_path)
623        .map_err(|e| format!("AOT write object failed: {e}"))?;
624
625    let linker = Linker::new();
626    linker
627        .link(&so_path, [obj_path.to_str().unwrap()])
628        .map_err(|e| format!("AOT link failed: {e}"))?;
629
630    let mut dylib_bytes = Vec::new();
631    File::open(&so_path)
632        .and_then(|mut f| f.read_to_end(&mut dylib_bytes))
633        .map_err(|e| format!("failed to read linked .so: {e}"))?;
634
635    debug!(
636        bytecode_len = job.bytecode.len(),
637        dylib_len = dylib_bytes.len(),
638        "AOT compilation succeeded",
639    );
640
641    Ok(WorkerSuccess::Aot(AotSuccess {
642        symbol_name: job.symbol_name.clone(),
643        dylib_bytes,
644        bytecode_len: job.bytecode.len(),
645    }))
646}
647
648#[cfg(not(feature = "llvm"))]
649fn compile_job(
650    job: CompileJob,
651    _config: &RuntimeConfig,
652    _helper: OutOfProcessHelper,
653) -> WorkerResult {
654    WorkerResult {
655        key: job.key,
656        outcome: Err("LLVM backend not available".into()),
657        kind: job.kind,
658        sync_notifier: job.sync_notifier,
659        generation: job.generation,
660        compile_duration: Duration::ZERO,
661        timings: CompileTimings::default(),
662    }
663}