Skip to main content

revmc_runtime/runtime/
worker.rs

1//! Background JIT and AOT compilation workers.
2//!
3//! Each worker thread owns a long-lived `EvmCompiler` instance tied to its
4//! thread-local LLVM context.
5//!
6//! With ORCv2, JIT code lifetime is managed per-module via `ResourceTracker`s.
7//! Each successful JIT compilation extracts the committed module's tracker and
8//! returns it as a [`JitCodeBacking`], which frees the machine code on drop.
9//! Workers no longer need to stay alive for code lifetime — they exit as soon
10//! as the job channel closes.
11
12use crate::{
13    CompileTimings, EvmCompilerFn, OptimizationLevel,
14    runtime::{
15        config::{CompilationKind, RuntimeConfig},
16        storage::RuntimeCacheKey,
17    },
18};
19use alloy_primitives::Bytes;
20use crossbeam_channel as chan;
21use rayon::{ThreadPool, ThreadPoolBuilder};
22#[cfg(feature = "llvm")]
23use std::{cell::RefCell, fs::File, io::Read, time::Instant};
24use std::{
25    sync::{
26        Arc,
27        atomic::{AtomicBool, AtomicUsize, Ordering},
28    },
29    time::Duration,
30};
31
32#[cfg(feature = "llvm")]
33use crate::{
34    EvmCompiler, EvmLlvmBackend, Linker,
35    llvm::{JitDylibGuard, orc::ResourceTracker},
36};
37
38/// Notifier for synchronous compilation requests.
39///
40/// Wraps an optional sender that is notified when the compilation
41/// completes (success or failure). Passed from the backend through
42/// the worker and back, then fired after the result is processed.
43pub(crate) struct SyncNotifier(Option<chan::Sender<()>>);
44
45impl SyncNotifier {
46    pub(crate) fn none() -> Self {
47        Self(None)
48    }
49
50    pub(crate) fn new(tx: chan::Sender<()>) -> Self {
51        Self(Some(tx))
52    }
53
54    pub(crate) fn notify(self) {
55        if let Some(tx) = self.0 {
56            let _ = tx.send(());
57        }
58    }
59}
60
61/// A compilation job sent from the backend to a worker.
62#[derive(derive_more::Debug)]
63pub(crate) struct CompileJob {
64    /// Whether this job compiles JIT or AOT output.
65    pub(crate) kind: CompilationKind,
66    /// The key to compile for.
67    pub(crate) key: RuntimeCacheKey,
68    /// The raw bytecode to compile.
69    pub(crate) bytecode: Bytes,
70    /// The symbol name to use for the compiled function.
71    pub(crate) symbol_name: String,
72    /// Optimization level for compilation.
73    pub(crate) opt_level: OptimizationLevel,
74    /// Optional notifier for synchronous callers.
75    #[debug(skip)]
76    pub(crate) sync_notifier: SyncNotifier,
77    /// Generation at the time the job was dispatched.
78    pub(crate) generation: u64,
79}
80
81/// Result of a compilation attempt, sent back from a worker to the backend.
82pub(crate) struct WorkerResult {
83    /// The key that was compiled.
84    pub(crate) key: RuntimeCacheKey,
85    /// The compilation outcome.
86    pub(crate) outcome: Result<WorkerSuccess, String>,
87    /// Whether this was a JIT or AOT compilation job.
88    pub(crate) kind: CompilationKind,
89    /// Optional notifier for synchronous callers, passed through from the job.
90    pub(crate) sync_notifier: SyncNotifier,
91    /// Generation at the time the job was dispatched.
92    pub(crate) generation: u64,
93    /// Wall-clock time spent compiling.
94    pub(crate) compile_duration: Duration,
95    /// Per-phase timing breakdown from the compiler.
96    pub(crate) timings: CompileTimings,
97}
98
99/// Successful compilation output.
100pub(crate) enum WorkerSuccess {
101    /// JIT compilation produced an in-memory function pointer.
102    Jit(JitSuccess),
103    /// AOT compilation produced shared-library bytes.
104    Aot(AotSuccess),
105}
106
107/// Successful JIT compilation output.
108pub(crate) struct JitSuccess {
109    /// The compiled function pointer.
110    pub(crate) func: EvmCompilerFn,
111    /// Owns the JIT machine code via an ORCv2 `ResourceTracker`.
112    /// Dropping this frees the compiled code.
113    pub(crate) backing: Arc<JitCodeBacking>,
114}
115
116/// Successful AOT compilation output.
117pub(crate) struct AotSuccess {
118    /// The symbol name in the shared library.
119    pub(crate) symbol_name: String,
120    /// The raw shared-library bytes (.so / .dylib).
121    pub(crate) dylib_bytes: Vec<u8>,
122    /// Length of the original bytecode.
123    pub(crate) bytecode_len: usize,
124}
125
126/// Owns JIT-compiled machine code via an ORCv2 `ResourceTracker` and a
127/// [`JitDylibGuard`].
128///
129/// The tracker provides per-entry code removal: dropping it calls
130/// `tracker.remove()` which frees this entry's machine code.
131/// The guard keeps the owning `JITDylib` alive — it won't be cleared
132/// or recycled until all guards are dropped.
133///
134/// This enables true per-entry eviction: removing a `CompiledProgram`
135/// from the resident map reclaims its machine code once all callers
136/// release their `Arc<CompiledProgram>` handles.
137pub(crate) struct JitCodeBacking {
138    /// The tracker that owns this entry's machine code.
139    /// Must be dropped (after `remove()`) BEFORE `_jd_guard` to ensure
140    /// the JITDylib is still valid when we remove resources from it.
141    #[cfg(feature = "llvm")]
142    tracker: Option<ResourceTracker>,
143    /// Keeps the owning JITDylib alive. Dropped after `tracker`.
144    #[cfg(feature = "llvm")]
145    _jd_guard: Arc<JitDylibGuard>,
146}
147
148impl JitCodeBacking {
149    #[cfg(feature = "llvm")]
150    pub(crate) fn new(tracker: ResourceTracker, jd_guard: Arc<JitDylibGuard>) -> Self {
151        Self { tracker: Some(tracker), _jd_guard: jd_guard }
152    }
153}
154
155#[cfg(feature = "llvm")]
156impl Drop for JitCodeBacking {
157    fn drop(&mut self) {
158        if let Some(tracker) = self.tracker.take()
159            && let Err(e) = tracker.remove()
160        {
161            warn!("failed to remove JIT code: {e}");
162        }
163    }
164}
165
166/// Handle to the worker pool.
167pub(crate) struct WorkerPool {
168    /// Rayon pool used to execute compilation jobs.
169    pool: Option<ThreadPool>,
170    /// Sender for worker results.
171    result_tx: chan::Sender<WorkerResult>,
172    /// Runtime configuration shared by spawned jobs.
173    config: Arc<RuntimeConfig>,
174    /// Number of queued jobs waiting to start.
175    queued: Arc<AtomicUsize>,
176    /// Maximum queued jobs accepted by the pool.
177    queue_capacity: usize,
178    /// Signals workers to stop accepting new work.
179    shutdown: Arc<AtomicBool>,
180}
181
182impl WorkerPool {
183    /// Creates and starts the worker pool.
184    pub(crate) fn new(result_tx: chan::Sender<WorkerResult>, config: RuntimeConfig) -> Self {
185        let worker_count = config.tuning.jit_worker_count;
186        let queue_capacity = worker_count.saturating_mul(config.tuning.jit_worker_queue_capacity);
187        let pool = (worker_count > 0).then(|| {
188            ThreadPoolBuilder::new()
189                .num_threads(worker_count)
190                .thread_name(|i| format!("revmc-{i:02}"))
191                .exit_handler(|_| clear_thread_local_compilers())
192                .build()
193                .expect("failed to spawn compile workers")
194        });
195
196        Self {
197            pool,
198            result_tx,
199            config: Arc::new(config),
200            queued: Arc::new(AtomicUsize::new(0)),
201            queue_capacity,
202            shutdown: Arc::new(AtomicBool::new(false)),
203        }
204    }
205
206    /// Tries to send a job to the worker pool.
207    /// Returns the job back on failure (queue full or shut down).
208    pub(crate) fn try_send(&mut self, job: CompileJob) -> Result<(), CompileJob> {
209        if self.pool.is_none() || self.shutdown.load(Ordering::Acquire) {
210            return Err(job);
211        }
212
213        let mut current = self.queued.load(Ordering::Acquire);
214        loop {
215            if current >= self.queue_capacity {
216                return Err(job);
217            }
218            match self.queued.compare_exchange_weak(
219                current,
220                current + 1,
221                Ordering::AcqRel,
222                Ordering::Acquire,
223            ) {
224                Ok(_) => break,
225                Err(next) => current = next,
226            }
227        }
228
229        let pool = self.pool.as_ref().unwrap();
230        let queued = self.queued.clone();
231        let result_tx = self.result_tx.clone();
232        let config = self.config.clone();
233        pool.spawn_fifo(move || {
234            queued.fetch_sub(1, Ordering::AcqRel);
235            let result = compile_job(job, &config);
236            let _ = result_tx.send(result);
237        });
238        Ok(())
239    }
240
241    /// Shuts down all workers after draining queued jobs.
242    pub(crate) fn shutdown(&mut self) {
243        self.shutdown.store(true, Ordering::Release);
244        if let Some(pool) = &self.pool {
245            pool.broadcast(|_| clear_thread_local_compilers());
246        }
247        self.pool.take();
248    }
249}
250
251impl Drop for WorkerPool {
252    fn drop(&mut self) {
253        self.shutdown();
254    }
255}
256
257#[cfg(feature = "llvm")]
258fn clear_thread_local_compilers() {
259    JIT_COMPILER.with_borrow_mut(Option::take);
260    AOT_COMPILER.with_borrow_mut(Option::take);
261}
262
263#[cfg(not(feature = "llvm"))]
264fn clear_thread_local_compilers() {}
265
266#[cfg(feature = "llvm")]
267fn compile_job(job: CompileJob, config: &RuntimeConfig) -> WorkerResult {
268    trace!(?job, "received job");
269    match job.kind {
270        CompilationKind::Jit => {
271            JIT_COMPILER.with_borrow_mut(|state| compile_with_state(job, config, state))
272        }
273        CompilationKind::Aot => {
274            AOT_COMPILER.with_borrow_mut(|state| compile_with_state(job, config, state))
275        }
276    }
277}
278
279#[cfg(feature = "llvm")]
280fn compile_with_state(
281    job: CompileJob,
282    config: &RuntimeConfig,
283    state_slot: &mut Option<CompilerState>,
284) -> WorkerResult {
285    let _span = match job.kind {
286        CompilationKind::Jit => {
287            debug_span!("jit_compile", hash=%job.key.code_hash, spec_id=?job.key.spec_id).entered()
288        }
289        CompilationKind::Aot => {
290            debug_span!("aot_compile", hash=%job.key.code_hash, spec_id=?job.key.spec_id).entered()
291        }
292    };
293    let t0 = Instant::now();
294
295    if state_slot.is_none() {
296        match CompilerState::new(config, job.kind) {
297            Ok(s) => *state_slot = Some(s),
298            Err(e) => {
299                error!(error = %e, "failed to create LLVM backend");
300                return WorkerResult {
301                    key: job.key,
302                    outcome: Err(e),
303                    kind: job.kind,
304                    sync_notifier: job.sync_notifier,
305                    generation: job.generation,
306                    compile_duration: t0.elapsed(),
307                    timings: CompileTimings::default(),
308                };
309            }
310        }
311    }
312
313    let state = state_slot.as_mut().unwrap();
314    let compiler = &mut state.compiler;
315
316    if job.kind == CompilationKind::Jit
317        && let Some(base) = &config.dump_dir
318    {
319        let dir =
320            base.join(format!("{:?}", job.key.spec_id)).join(format!("{}", job.key.code_hash));
321        compiler.set_dump_to(Some(dir));
322    }
323    compiler.set_opt_level(job.opt_level);
324
325    let outcome = match job.kind {
326        CompilationKind::Jit => compile_jit_artifact(&job, compiler),
327        CompilationKind::Aot => compile_aot_artifact(&job, compiler),
328    };
329    let timings = compiler.take_timings();
330
331    if let Err(err) = compiler.clear_ir() {
332        warn!(%err, "clear_ir failed");
333    }
334
335    state.compilations_since_recycle += 1;
336    if config.tuning.compiler_recycle_threshold > 0
337        && state.compilations_since_recycle >= config.tuning.compiler_recycle_threshold
338    {
339        debug!(compilations_since_recycle = state.compilations_since_recycle, "recycling compiler");
340        match CompilerState::new(config, job.kind) {
341            Ok(new_state) => {
342                *state_slot = Some(new_state);
343                revmc_llvm::global_gc();
344            }
345            Err(e) => {
346                error!(error = %e, "failed to recreate compiler");
347                state.compilations_since_recycle = 0;
348            }
349        }
350    }
351
352    WorkerResult {
353        key: job.key,
354        outcome,
355        kind: job.kind,
356        sync_notifier: job.sync_notifier,
357        generation: job.generation,
358        compile_duration: t0.elapsed(),
359        timings,
360    }
361}
362
363#[cfg(feature = "llvm")]
364struct CompilerState {
365    compiler: EvmCompiler<EvmLlvmBackend>,
366    compilations_since_recycle: usize,
367}
368
369#[cfg(feature = "llvm")]
370impl CompilerState {
371    fn new(config: &RuntimeConfig, kind: CompilationKind) -> Result<Self, String> {
372        Ok(Self {
373            compiler: create_compiler(config, kind == CompilationKind::Aot)?,
374            compilations_since_recycle: 0,
375        })
376    }
377}
378
379#[cfg(feature = "llvm")]
380thread_local! {
381    static JIT_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
382    static AOT_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
383}
384
385#[cfg(feature = "llvm")]
386fn create_compiler(
387    config: &RuntimeConfig,
388    aot: bool,
389) -> Result<EvmCompiler<EvmLlvmBackend>, String> {
390    let backend = EvmLlvmBackend::new(aot).map_err(|e| e.to_string())?;
391    let mut compiler = EvmCompiler::new(backend);
392    compiler.set_opt_level(if aot {
393        config.tuning.aot_opt_level
394    } else {
395        config.tuning.jit_opt_level
396    });
397    compiler.debug_assertions(config.debug_assertions);
398    compiler.set_dedup(!config.no_dedup);
399    compiler.set_dse(!config.no_dse);
400    if let Some(gas_params) = &config.gas_params {
401        compiler.set_gas_params(gas_params.clone());
402    }
403    Ok(compiler)
404}
405
406#[cfg(feature = "llvm")]
407fn compile_jit_artifact(
408    job: &CompileJob,
409    compiler: &mut EvmCompiler<EvmLlvmBackend>,
410) -> Result<WorkerSuccess, String> {
411    let result = unsafe { compiler.jit(&job.symbol_name, &job.bytecode[..], job.key.spec_id) };
412    match result {
413        Ok(func) => {
414            let jd_guard = compiler.backend_mut().jit_dylib_guard();
415            debug!("JIT compilation succeeded");
416            let tracker = compiler
417                .backend_mut()
418                .take_last_resource_tracker()
419                .expect("no ResourceTracker after JIT");
420            let backing = Arc::new(JitCodeBacking::new(tracker, jd_guard));
421            Ok(WorkerSuccess::Jit(JitSuccess { func, backing }))
422        }
423        Err(err) => {
424            warn!(%err, "JIT compilation failed");
425            Err(format!("{err}"))
426        }
427    }
428}
429
430/// Compiles a single bytecode to a shared library and returns the raw bytes.
431#[cfg(feature = "llvm")]
432fn compile_aot_artifact(
433    job: &CompileJob,
434    compiler: &mut EvmCompiler<EvmLlvmBackend>,
435) -> Result<WorkerSuccess, String> {
436    compiler
437        .translate(&job.symbol_name, &job.bytecode[..], job.key.spec_id)
438        .map_err(|e| format!("AOT translate failed: {e}"))?;
439
440    let tmp_dir = tempfile::tempdir().map_err(|e| format!("failed to create temp dir: {e}"))?;
441
442    let obj_path = tmp_dir.path().join("a.o");
443    let so_path = tmp_dir.path().join("a.so");
444
445    compiler
446        .write_object_to_file(&obj_path)
447        .map_err(|e| format!("AOT write object failed: {e}"))?;
448
449    let linker = Linker::new();
450    linker
451        .link(&so_path, [obj_path.to_str().unwrap()])
452        .map_err(|e| format!("AOT link failed: {e}"))?;
453
454    let mut dylib_bytes = Vec::new();
455    File::open(&so_path)
456        .and_then(|mut f| f.read_to_end(&mut dylib_bytes))
457        .map_err(|e| format!("failed to read linked .so: {e}"))?;
458
459    debug!(
460        bytecode_len = job.bytecode.len(),
461        dylib_len = dylib_bytes.len(),
462        "AOT compilation succeeded",
463    );
464
465    Ok(WorkerSuccess::Aot(AotSuccess {
466        symbol_name: job.symbol_name.clone(),
467        dylib_bytes,
468        bytecode_len: job.bytecode.len(),
469    }))
470}
471
472#[cfg(not(feature = "llvm"))]
473fn compile_job(job: CompileJob, _config: &RuntimeConfig) -> WorkerResult {
474    WorkerResult {
475        key: job.key,
476        outcome: Err("LLVM backend not available".into()),
477        kind: job.kind,
478        sync_notifier: job.sync_notifier,
479        generation: job.generation,
480        compile_duration: Duration::ZERO,
481        timings: CompileTimings::default(),
482    }
483}