Skip to main content

revmc_runtime/runtime/
backend.rs

1use crate::{
2    EvmCompilerFn, eyre,
3    runtime::{
4        LookupRequest,
5        api::{CompiledProgram, LoadedLibrary, ProgramKind},
6        config::{CompilationEvent, CompilationKind, RuntimeConfig, RuntimeTuning},
7        storage::{
8            ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeCacheKey,
9        },
10        worker::{
11            AotSuccess, CompileJob, JitCodeBacking, JitObjectSuccess, SyncNotifier, WorkerPool,
12            WorkerResult, WorkerSuccess,
13        },
14    },
15};
16use alloy_primitives::{
17    Bytes, keccak256,
18    map::{DefaultHashBuilder, HashMap},
19};
20use crossbeam_channel as chan;
21use crossbeam_queue::ArrayQueue;
22use dashmap::DashMap;
23use quanta::Instant;
24use std::{
25    ffi::CString,
26    mem,
27    ops::ControlFlow,
28    sync::{Arc, atomic::Ordering},
29    time::{SystemTime, UNIX_EPOCH},
30};
31
32#[cfg(feature = "llvm")]
33use crate::llvm::jit_memory_usage;
34#[cfg(feature = "llvm")]
35use revmc_context::RawEvmCompilerFn;
36
37/// The resident map type: code_hash+spec_id → compiled program.
38pub(crate) type ResidentMap = DashMap<RuntimeCacheKey, Arc<CompiledProgram>, DefaultHashBuilder>;
39
40/// Bounded MPMC lock-free queue of lookup-observed events.
41///
42/// Producers (lookup hot path) push without blocking; on overflow the event
43/// is silently dropped (`stats.events_dropped` is bumped). The backend
44/// drains via `pop` on every loop iteration. Hotness signal is best-effort.
45pub(crate) type EventQueue = ArrayQueue<LookupRequest>;
46
47/// Per-entry metadata tracked alongside the resident map for eviction decisions.
48struct ResidentMeta {
49    /// When this entry was last hit by a lookup.
50    last_hit_at: Instant,
51}
52
53/// Returns the total bytes of JIT-allocated memory via the memory plugin.
54fn jit_total_bytes() -> usize {
55    #[cfg(feature = "llvm")]
56    {
57        jit_memory_usage().map(|u| u.total_bytes()).unwrap_or(0)
58    }
59    #[cfg(not(feature = "llvm"))]
60    {
61        0
62    }
63}
64
65#[cfg(feature = "llvm")]
66struct JitObjectLinker {
67    backend: Option<crate::EvmLlvmBackend>,
68}
69
70#[cfg(feature = "llvm")]
71impl JitObjectLinker {
72    const fn new() -> Self {
73        Self { backend: None }
74    }
75
76    fn link(
77        &mut self,
78        success: &JitObjectSuccess,
79    ) -> eyre::Result<(EvmCompilerFn, Arc<JitCodeBacking>)> {
80        let backend = match &mut self.backend {
81            Some(backend) => backend,
82            None => self.backend.insert(crate::EvmLlvmBackend::new(false)?),
83        };
84
85        let symbol_name = CString::new(success.symbol_name.clone())?;
86        let builtin_symbols = success
87            .builtin_symbols
88            .iter()
89            .map(|name| {
90                let addr = revmc_builtins::Builtin::parse(name)
91                    .ok_or_else(|| eyre::eyre!("unknown builtin symbol: {name}"))?
92                    .addr();
93                Ok((CString::new(name.as_str())?, addr))
94            })
95            .collect::<eyre::Result<Vec<_>>>()?;
96        let (addr, tracker, jd_guard) = backend.link_jit_object_in_fresh_dylib(
97            &symbol_name,
98            &success.object_bytes,
99            &builtin_symbols,
100        )?;
101        let func =
102            EvmCompilerFn::new(unsafe { std::mem::transmute::<usize, RawEvmCompilerFn>(addr) });
103        Ok((func, Arc::new(JitCodeBacking::new(tracker, jd_guard))))
104    }
105}
106
107#[cfg(not(feature = "llvm"))]
108struct JitObjectLinker;
109
110#[cfg(not(feature = "llvm"))]
111impl JitObjectLinker {
112    const fn new() -> Self {
113        Self
114    }
115
116    fn link(
117        &mut self,
118        _success: &JitObjectSuccess,
119    ) -> eyre::Result<(EvmCompilerFn, Arc<JitCodeBacking>)> {
120        eyre::bail!("LLVM backend not available")
121    }
122}
123
124/// Commands sent to the backend thread on the bounded command channel.
125///
126/// Lookup-observed events are NOT carried here — they go through the
127/// [`EventQueue`] to avoid waking the backend on every lookup.
128pub(crate) enum Command {
129    /// Explicit request to JIT-compile a bytecode.
130    CompileJit(CompileJitRequest),
131    /// Explicit request to prepare AOT artifacts.
132    PrepareAot(Vec<PrepareAotRequest>),
133    /// Clear the resident compiled map.
134    ClearResident,
135    /// Clear persisted artifacts from the artifact store.
136    ClearPersisted,
137    /// Clear both resident and persisted.
138    ClearAll,
139    /// Pause out-of-process helper execution.
140    Pause,
141    /// Resume out-of-process helper execution.
142    Resume,
143    /// Shut down the backend.
144    Shutdown,
145}
146
147/// An explicit JIT compilation request.
148pub(crate) struct CompileJitRequest {
149    /// The key to compile for.
150    pub(crate) key: RuntimeCacheKey,
151    /// The raw bytecode.
152    pub(crate) bytecode: Bytes,
153    /// Optional notifier for synchronous callers.
154    pub(crate) sync_notifier: SyncNotifier,
155}
156
157/// An explicit AOT preparation request.
158pub(crate) struct PrepareAotRequest {
159    /// The key to compile for.
160    pub(crate) key: RuntimeCacheKey,
161    /// The raw bytecode.
162    pub(crate) bytecode: Bytes,
163}
164
165/// Per-key state tracked by the backend.
166struct EntryState {
167    /// Number of observed misses.
168    hotness: u32,
169    /// Current phase.
170    phase: EntryPhase,
171    /// The bytecode for this key (captured from a miss event).
172    bytecode: Bytes,
173    /// When this entry was last observed.
174    last_observed_at: Instant,
175    /// Sync notifiers waiting for this entry to finish compiling.
176    pending_notifiers: Vec<SyncNotifier>,
177}
178
179/// Phase of a backend entry.
180#[derive(Clone, Copy, Debug, PartialEq, Eq)]
181enum EntryPhase {
182    /// Not yet hot enough for JIT.
183    Cold,
184    /// JIT compilation in progress on a worker.
185    Working,
186}
187
188/// Whether a JIT admission request was triggered by hot-path observation
189/// (gated on hotness + cold-entry cap) or by an explicit user request
190/// (unconditional, may carry a sync notifier).
191#[derive(Clone, Copy, Debug, PartialEq, Eq)]
192enum AdmitMode {
193    Observed,
194    Explicit,
195}
196
197/// All backend-thread-owned mutable state.
198struct BackendState {
199    /// Shared state (resident map, event queue, stats).
200    inner: Arc<super::BackendShared>,
201    /// Per-key metadata for eviction (backend-only).
202    resident_meta: HashMap<RuntimeCacheKey, ResidentMeta>,
203    /// Per-key tracking state (backend-only).
204    entries: HashMap<RuntimeCacheKey, EntryState>,
205    /// Worker pool for JIT compilation.
206    workers: WorkerPool,
207    /// Backend-thread-owned linker for out-of-process JIT objects.
208    jit_object_linker: JitObjectLinker,
209    /// Receiver for worker results.
210    result_rx: chan::Receiver<WorkerResult>,
211    /// Artifact store for persisted artifacts.
212    store: Option<Arc<dyn ArtifactStore>>,
213    /// Tuning knobs.
214    tuning: RuntimeTuning,
215    /// Whether observed misses compile AOT artifacts instead of JIT code.
216    aot: bool,
217    /// Number of keys currently in Working phase.
218    pending_jobs: usize,
219    /// Monotonically increasing generation counter, bumped on clear/invalidation.
220    generation: u64,
221    /// Last time an eviction sweep was run.
222    last_sweep: Instant,
223    /// Optional user callback for compilation events.
224    on_compilation: Option<Arc<dyn Fn(CompilationEvent) + Send + Sync>>,
225}
226
227impl BackendState {
228    fn handle(&mut self, cmd: Command) -> ControlFlow<()> {
229        match cmd {
230            Command::CompileJit(req) => self.handle_compile_jit(req),
231            Command::PrepareAot(reqs) => self.handle_prepare_aot(reqs),
232            Command::ClearResident => self.handle_clear_resident(),
233            Command::ClearPersisted => self.handle_clear_persisted(),
234            Command::ClearAll => self.handle_clear_all(),
235            Command::Pause => self.workers.pause(),
236            Command::Resume => self.workers.resume(),
237            Command::Shutdown => return ControlFlow::Break(()),
238        }
239        ControlFlow::Continue(())
240    }
241
242    fn tick(&mut self) {
243        self.drain_events();
244        self.run_eviction_sweep();
245    }
246
247    /// Drains all currently-queued lookup events.
248    fn drain_events(&mut self) {
249        // Cap per-iteration drain so a flood of events can't starve other
250        // work (commands, worker results, sweeps). Surplus events stay in the
251        // queue and are picked up next iteration.
252        for _ in 0..self.tuning.max_events_per_drain {
253            let Some(event) = self.inner.events.pop() else { break };
254            self.handle_lookup_observed(event);
255        }
256    }
257
258    fn handle_lookup_observed(&mut self, event: LookupRequest) {
259        let hit = event.code.is_empty();
260        if hit {
261            self.inner.stats.lookup_hits.fetch_add(1, Ordering::Relaxed);
262            if let Some(meta) = self.resident_meta.get_mut(&event.key) {
263                meta.last_hit_at = Instant::now();
264            }
265        } else {
266            self.inner.stats.lookup_misses.fetch_add(1, Ordering::Relaxed);
267            let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
268            self.try_admit(kind, event.key, event.code, SyncNotifier::none(), AdmitMode::Observed);
269        }
270    }
271
272    fn handle_compile_jit(&mut self, req: CompileJitRequest) {
273        let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
274        self.try_admit(kind, req.key, req.bytecode, req.sync_notifier, AdmitMode::Explicit);
275    }
276
277    fn handle_prepare_aot(&mut self, reqs: Vec<PrepareAotRequest>) {
278        for req in reqs {
279            self.try_admit(
280                CompilationKind::Aot,
281                req.key,
282                req.bytecode,
283                SyncNotifier::none(),
284                AdmitMode::Explicit,
285            );
286        }
287    }
288
289    /// Common admission path for JIT and AOT compilation requests.
290    ///
291    /// Handles the cold→working state machine, hotness gating, in-flight
292    /// dedup, persisted AOT probing, and worker dispatch. Observed promotion
293    /// is gated by hotness; explicit requests are unconditional.
294    fn try_admit(
295        &mut self,
296        kind: CompilationKind,
297        key: RuntimeCacheKey,
298        bytecode: Bytes,
299        sync_notifier: SyncNotifier,
300        mode: AdmitMode,
301    ) {
302        if self.inner.resident.contains_key(&key) {
303            sync_notifier.notify();
304            return;
305        }
306
307        if !self.tuning.should_compile(&bytecode) {
308            sync_notifier.notify();
309            return;
310        }
311
312        if matches!(mode, AdmitMode::Observed) {
313            let max_entries = self.tuning.jit_max_pending_jobs * 10;
314            if !self.entries.contains_key(&key) && self.entries.len() >= max_entries {
315                return;
316            }
317        }
318
319        if kind == CompilationKind::Aot && self.try_load_persisted_aot(&key) {
320            sync_notifier.notify();
321            return;
322        }
323
324        let now = Instant::now();
325        let entry = self.entries.entry(key).or_insert_with(|| EntryState {
326            hotness: 0,
327            phase: EntryPhase::Cold,
328            bytecode: bytecode.clone(),
329            last_observed_at: now,
330            pending_notifiers: Vec::new(),
331        });
332        entry.last_observed_at = now;
333
334        if entry.phase == EntryPhase::Working {
335            entry.pending_notifiers.push(sync_notifier);
336            return;
337        }
338
339        if matches!(mode, AdmitMode::Observed) {
340            entry.hotness = entry.hotness.saturating_add(1);
341            if (entry.hotness as usize) < self.tuning.jit_hot_threshold {
342                return;
343            }
344        }
345
346        if self.pending_jobs >= self.tuning.jit_max_pending_jobs {
347            sync_notifier.notify();
348            return;
349        }
350
351        let prefix = match kind {
352            CompilationKind::Jit => "jit",
353            CompilationKind::Aot => "aot",
354        };
355        let opt_level = match kind {
356            CompilationKind::Jit => self.tuning.jit_opt_level,
357            CompilationKind::Aot => self.tuning.aot_opt_level,
358        };
359        let symbol = format!("{prefix}_{:x}_{:?}", key.code_hash, key.spec_id);
360        let job = CompileJob {
361            kind,
362            key,
363            bytecode: entry.bytecode.clone(),
364            symbol_name: symbol,
365            opt_level,
366            sync_notifier,
367            generation: self.generation,
368        };
369
370        match self.workers.try_send(job) {
371            Ok(()) => {
372                debug!(
373                    code_hash = %key.code_hash,
374                    spec_id = ?key.spec_id,
375                    ?kind,
376                    hotness = entry.hotness,
377                    pending_jobs = self.pending_jobs + 1,
378                    "dispatched compilation",
379                );
380                entry.phase = EntryPhase::Working;
381                self.pending_jobs += 1;
382                self.inner.stats.compilations_dispatched.fetch_add(1, Ordering::Relaxed);
383            }
384            Err(job) => {
385                warn!(code_hash = %key.code_hash, "worker pool saturated, dropping request");
386                job.sync_notifier.notify();
387            }
388        }
389    }
390
391    /// Tries to load an already-persisted AOT artifact from the store into the resident map.
392    /// Returns `true` if the artifact was loaded successfully.
393    fn try_load_persisted_aot(&mut self, key: &RuntimeCacheKey) -> bool {
394        let store = match &self.store {
395            Some(s) => s,
396            None => return false,
397        };
398
399        let artifact_key = ArtifactKey {
400            runtime: *key,
401            backend: BackendSelection::Llvm,
402            opt_level: self.tuning.aot_opt_level,
403        };
404
405        match store.load(&artifact_key) {
406            Ok(Some(stored)) => {
407                match (|| -> eyre::Result<CompiledProgram> {
408                    let library = unsafe { libloading::Library::new(&stored.dylib_path) }
409                        .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
410                    let func: EvmCompilerFn = unsafe {
411                        let sym: libloading::Symbol<'_, EvmCompilerFn> =
412                            library.get(stored.manifest.symbol_name.as_bytes()).map_err(|e| {
413                                eyre::eyre!("symbol '{}': {e}", stored.manifest.symbol_name)
414                            })?;
415                        *sym
416                    };
417                    let library = Arc::new(LoadedLibrary::new(library));
418                    Ok(CompiledProgram::new_aot(*key, func, library))
419                })() {
420                    Ok(program) => {
421                        debug!(
422                            code_hash = %key.code_hash,
423                            spec_id = ?key.spec_id,
424                            "loaded existing AOT artifact from store, skipping recompilation",
425                        );
426                        self.insert_resident(*key, Arc::new(program));
427                        true
428                    }
429                    Err(e) => {
430                        warn!(
431                            code_hash = %key.code_hash,
432                            error = %e,
433                            "failed to load persisted AOT artifact, will recompile",
434                        );
435                        false
436                    }
437                }
438            }
439            Ok(None) => false,
440            Err(e) => {
441                warn!(
442                    code_hash = %key.code_hash,
443                    error = %e,
444                    "failed to probe artifact store",
445                );
446                false
447            }
448        }
449    }
450
451    fn handle_clear_resident(&mut self) {
452        self.workers.cancel_in_flight();
453        self.inner.resident.clear();
454        self.resident_meta.clear();
455        // Notify any pending sync callers before clearing entries.
456        for (_, entry) in self.entries.drain() {
457            for n in entry.pending_notifiers {
458                n.notify();
459            }
460        }
461        // Discard pending lookup events: they were observed before the clear
462        // and would otherwise get processed against the new generation.
463        while self.inner.events.pop().is_some() {}
464        // Bump generation so in-flight worker results from before the clear are discarded.
465        self.generation += 1;
466        debug!(generation = self.generation, "resident map cleared");
467    }
468
469    fn handle_clear_persisted(&mut self) {
470        if let Some(store) = &self.store {
471            if let Err(e) = store.clear() {
472                warn!(error = %e, "failed to clear artifact store");
473            } else {
474                debug!("artifact store cleared");
475            }
476        }
477    }
478
479    fn handle_clear_all(&mut self) {
480        self.handle_clear_resident();
481        self.handle_clear_persisted();
482    }
483
484    fn insert_resident(&mut self, key: RuntimeCacheKey, program: Arc<CompiledProgram>) {
485        self.inner.resident.insert(key, program);
486        self.resident_meta.insert(key, ResidentMeta { last_hit_at: Instant::now() });
487    }
488
489    fn remove_resident(&mut self, key: &RuntimeCacheKey) {
490        self.inner.resident.remove(key);
491        self.resident_meta.remove(key);
492    }
493
494    fn handle_worker_result(&mut self, result: WorkerResult) {
495        self.pending_jobs = self.pending_jobs.saturating_sub(1);
496
497        // Drain pending notifiers from the entry before processing.
498        let pending_notifiers = self
499            .entries
500            .get_mut(&result.key)
501            .map(|e| mem::take(&mut e.pending_notifiers))
502            .unwrap_or_default();
503
504        let notify = || {
505            result.sync_notifier.notify();
506            for n in pending_notifiers {
507                n.notify();
508            }
509        };
510
511        // Discard stale results from a previous generation (e.g. after clear).
512        if result.generation != self.generation {
513            debug!(
514                code_hash = %result.key.code_hash,
515                result_gen = result.generation,
516                current_gen = self.generation,
517                "discarding stale worker result",
518            );
519            self.entries.remove(&result.key);
520            notify();
521            return;
522        }
523
524        let kind = result.kind;
525        let success = result.outcome.is_ok();
526
527        if let Some(cb) = &self.on_compilation {
528            cb(CompilationEvent {
529                code_hash: result.key.code_hash,
530                spec_id: result.key.spec_id,
531                duration: result.compile_duration,
532                kind,
533                success,
534                timings: result.timings,
535            });
536        }
537
538        match result.outcome {
539            Ok(WorkerSuccess::Jit(success)) => {
540                let program =
541                    Arc::new(CompiledProgram::new_jit(result.key, success.func, success.backing));
542                self.insert_resident(result.key, program);
543                self.entries.remove(&result.key);
544                self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
545
546                debug!(
547                    code_hash = %result.key.code_hash,
548                    spec_id = ?result.key.spec_id,
549                    compile_time = ?result.compile_duration,
550                    "JIT program published to resident map",
551                );
552            }
553            Ok(WorkerSuccess::Aot(success)) => {
554                self.handle_aot_success(result.key, success);
555            }
556            Ok(WorkerSuccess::JitObject(success)) => {
557                self.handle_jit_object_success(result.key, success, result.compile_duration);
558            }
559            Err(err) => {
560                self.entries.remove(&result.key);
561                self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
562
563                warn!(
564                    code_hash = %result.key.code_hash,
565                    error = %err,
566                    compile_time = ?result.compile_duration,
567                    "compilation failed",
568                );
569            }
570        }
571
572        notify();
573    }
574
575    fn handle_jit_object_success(
576        &mut self,
577        key: RuntimeCacheKey,
578        success: JitObjectSuccess,
579        compile_duration: std::time::Duration,
580    ) {
581        match self.jit_object_linker.link(&success) {
582            Ok((func, backing)) => {
583                let program = Arc::new(CompiledProgram::new_jit(key, func, backing));
584                self.insert_resident(key, program);
585                self.entries.remove(&key);
586                self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
587
588                debug!(
589                    code_hash = %key.code_hash,
590                    spec_id = ?key.spec_id,
591                    compile_time = ?compile_duration,
592                    object_len = success.object_bytes.len(),
593                    "JIT object linked and published to resident map",
594                );
595            }
596            Err(err) => {
597                self.entries.remove(&key);
598                self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
599
600                warn!(
601                    code_hash = %key.code_hash,
602                    error = %err,
603                    compile_time = ?compile_duration,
604                    "failed to link JIT object",
605                );
606            }
607        }
608    }
609
610    fn handle_aot_success(&mut self, key: RuntimeCacheKey, success: AotSuccess) {
611        let artifact_key = ArtifactKey {
612            runtime: key,
613            backend: BackendSelection::Llvm,
614            opt_level: self.tuning.aot_opt_level,
615        };
616
617        let content_hash = keccak256(&success.dylib_bytes).0;
618
619        let manifest = ArtifactManifest {
620            artifact_key: artifact_key.clone(),
621            symbol_name: success.symbol_name.clone(),
622            bytecode_len: success.bytecode_len,
623            artifact_len: success.dylib_bytes.len(),
624            created_at_unix_secs: SystemTime::now()
625                .duration_since(UNIX_EPOCH)
626                .map(|d| d.as_secs())
627                .unwrap_or(0),
628            content_hash,
629        };
630
631        // Persist to store if available.
632        if let Some(store) = &self.store {
633            if let Err(e) = store.store(&artifact_key, &manifest, &success.dylib_bytes) {
634                warn!(
635                    code_hash = %key.code_hash,
636                    error = %e,
637                    "failed to persist AOT artifact",
638                );
639                self.entries.remove(&key);
640                self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
641                return;
642            }
643
644            debug!(
645                code_hash = %key.code_hash,
646                spec_id = ?key.spec_id,
647                dylib_len = success.dylib_bytes.len(),
648                "AOT artifact persisted to store",
649            );
650
651            // Load from store to get the canonical path, then dlopen.
652            match store.load(&artifact_key) {
653                Ok(Some(stored)) => {
654                    match (|| -> eyre::Result<CompiledProgram> {
655                        let library = unsafe { libloading::Library::new(&stored.dylib_path) }
656                            .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
657                        let func: EvmCompilerFn = unsafe {
658                            let sym: libloading::Symbol<'_, EvmCompilerFn> =
659                                library.get(success.symbol_name.as_bytes()).map_err(|e| {
660                                    eyre::eyre!("symbol '{}': {e}", success.symbol_name)
661                                })?;
662                            *sym
663                        };
664                        let library = Arc::new(LoadedLibrary::new(library));
665                        Ok(CompiledProgram::new_aot(key, func, library))
666                    })() {
667                        Ok(program) => {
668                            self.insert_resident(key, Arc::new(program));
669                            self.entries.remove(&key);
670                            self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
671
672                            debug!(
673                                code_hash = %key.code_hash,
674                                spec_id = ?key.spec_id,
675                                "AOT program loaded into resident map",
676                            );
677                        }
678                        Err(e) => {
679                            warn!(
680                                code_hash = %key.code_hash,
681                                error = %e,
682                                "failed to load persisted AOT artifact",
683                            );
684                            // Persisted successfully but couldn't load — remove so JIT can retry.
685                            self.entries.remove(&key);
686                            self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
687                        }
688                    }
689                }
690                Ok(None) => {
691                    warn!(
692                        code_hash = %key.code_hash,
693                        "stored AOT artifact not found on reload",
694                    );
695                    self.entries.remove(&key);
696                    self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
697                }
698                Err(e) => {
699                    warn!(
700                        code_hash = %key.code_hash,
701                        error = %e,
702                        "failed to reload persisted AOT artifact",
703                    );
704                    self.entries.remove(&key);
705                    self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
706                }
707            }
708        } else {
709            // No store configured — can't persist, remove so JIT can retry.
710            warn!(
711                code_hash = %key.code_hash,
712                "AOT compilation completed but no artifact store configured",
713            );
714            self.entries.remove(&key);
715            self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
716        }
717    }
718
719    /// Runs an eviction sweep: removes idle entries and enforces the memory budget.
720    fn run_eviction_sweep(&mut self) {
721        if !self.should_sweep() {
722            return;
723        }
724
725        let now = Instant::now();
726        self.last_sweep = now;
727
728        let idle_duration = self.tuning.idle_evict_duration;
729        let budget = self.tuning.resident_code_cache_bytes;
730
731        // Phase 1: evict idle entries.
732        if let Some(idle) = idle_duration {
733            let idle_keys: Vec<RuntimeCacheKey> = self
734                .resident_meta
735                .iter()
736                .filter(|(_, meta)| now.duration_since(meta.last_hit_at) > idle)
737                .map(|(key, _)| *key)
738                .collect();
739
740            for key in &idle_keys {
741                debug!(
742                    code_hash = %key.code_hash,
743                    spec_id = ?key.spec_id,
744                    "evicting idle entry",
745                );
746                self.remove_resident(key);
747                self.entries.remove(key);
748                self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
749            }
750        }
751
752        // Phase 2: evict stale cold entries that never became hot enough to compile.
753        if let Some(idle) = idle_duration {
754            let resident = &self.inner.resident;
755            self.entries.retain(|key, entry| {
756                let stale = entry.phase == EntryPhase::Cold
757                    && now.duration_since(entry.last_observed_at) > idle
758                    && !resident.contains_key(key);
759                !stale
760            });
761        }
762
763        // Phase 3: enforce memory budget by evicting LRU JIT entries.
764        if budget > 0 && jit_total_bytes() > budget {
765            // Collect JIT entries sorted by last_hit_at ascending (oldest first).
766            // AOT entries are excluded because they don't contribute to `jit_total_bytes()`.
767            let mut entries: Vec<(RuntimeCacheKey, Instant)> = self
768                .resident_meta
769                .iter()
770                .filter(|(key, _)| {
771                    self.inner.resident.get(key).is_some_and(|p| matches!(p.kind, ProgramKind::Jit))
772                })
773                .map(|(key, meta)| (*key, meta.last_hit_at))
774                .collect();
775            entries.sort_by_key(|(_, t)| *t);
776
777            for (key, _) in entries {
778                if jit_total_bytes() <= budget {
779                    break;
780                }
781                debug!(
782                    code_hash = %key.code_hash,
783                    spec_id = ?key.spec_id,
784                    "evicting entry to stay within memory budget",
785                );
786                self.remove_resident(&key);
787                self.entries.remove(&key);
788                self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
789            }
790        }
791    }
792
793    /// Returns whether eviction is configured and a sweep is due.
794    fn should_sweep(&self) -> bool {
795        // Over budget — sweep immediately regardless of interval.
796        let maxrss = self.tuning.resident_code_cache_bytes;
797        if maxrss > 0 && jit_total_bytes() > maxrss {
798            return true;
799        }
800        self.tuning.idle_evict_duration.is_some()
801            && self.last_sweep.elapsed() >= self.tuning.eviction_sweep_interval
802    }
803}
804
805/// Runs the backend event loop. Called on the backend thread.
806pub(crate) fn run(
807    inner: Arc<super::BackendShared>,
808    cmd_rx: chan::Receiver<Command>,
809    config: RuntimeConfig,
810) {
811    debug!("backend thread started");
812
813    let (result_tx, result_rx) = chan::unbounded::<WorkerResult>();
814
815    let workers = WorkerPool::new(result_tx, config.clone(), Arc::clone(&inner.stats));
816
817    let sweep_interval = config.tuning.eviction_sweep_interval;
818    let event_drain_interval = config.tuning.event_drain_interval;
819
820    // Seed resident metadata from startup-preloaded AOT entries.
821    let now = Instant::now();
822    let mut preload_meta = HashMap::default();
823    for entry in inner.resident.iter() {
824        preload_meta.insert(*entry.key(), ResidentMeta { last_hit_at: now });
825    }
826
827    let mut state = BackendState {
828        inner,
829        resident_meta: preload_meta,
830        entries: HashMap::default(),
831        workers,
832        jit_object_linker: JitObjectLinker::new(),
833        result_rx,
834        store: config.store,
835        tuning: config.tuning,
836        aot: config.aot,
837        pending_jobs: 0,
838        generation: 0,
839        last_sweep: now,
840        on_compilation: config.on_compilation,
841    };
842
843    // Tick interval is min(event_drain, sweep) so we never sleep longer than
844    // either. Events are drained on every wakeup regardless of cause.
845    let tick = event_drain_interval.min(sweep_interval);
846    let shutdown_reason;
847
848    loop {
849        chan::select! {
850            recv(cmd_rx) -> msg => {
851                let Ok(cmd) = msg else {
852                    shutdown_reason = "channel closed";
853                    break;
854                };
855                if state.handle(cmd).is_break() {
856                    shutdown_reason = "shutdown command";
857                    break;
858                }
859            }
860            recv(state.result_rx) -> msg => {
861                match msg {
862                    Ok(result) => state.handle_worker_result(result),
863                    Err(_) => warn!("worker unexpectedly closed"),
864                }
865            }
866            default(tick) => {}
867        }
868        state.tick();
869    }
870
871    debug!(?shutdown_reason, stats = ?state.inner.stats(), "backend task shutting down");
872
873    state.workers.shutdown();
874    while state.result_rx.try_recv().is_ok() {}
875}
876
877#[cfg(all(test, feature = "llvm"))]
878mod tests {
879    use super::*;
880    use crate::runtime::worker::{
881        CompileJob, WorkerSuccess, compile_jit_object_artifact, create_compiler,
882    };
883    use revm_primitives::hardfork::SpecId;
884
885    /// PUSH1 0x42 PUSH0 MSTORE PUSH1 0x20 PUSH0 RETURN.
886    const BYTECODE_RET42: &[u8] = &[0x60, 0x42, 0x5f, 0x52, 0x60, 0x20, 0x5f, 0xf3];
887
888    fn compile_jit_object(symbol_name: &str) -> JitObjectSuccess {
889        let config = RuntimeConfig::default();
890        let mut compiler = create_compiler(&config, true).unwrap();
891        let job = CompileJob {
892            kind: CompilationKind::Jit,
893            key: RuntimeCacheKey { code_hash: keccak256(BYTECODE_RET42), spec_id: SpecId::CANCUN },
894            bytecode: Bytes::copy_from_slice(BYTECODE_RET42),
895            symbol_name: symbol_name.to_owned(),
896            opt_level: config.tuning.jit_opt_level,
897            sync_notifier: SyncNotifier::none(),
898            generation: 0,
899        };
900
901        match compile_jit_object_artifact(&job, &mut compiler).unwrap() {
902            WorkerSuccess::JitObject(success) => success,
903            _ => unreachable!(),
904        }
905    }
906
907    #[test]
908    fn jit_object_linker_relinks_live_symbol_name() {
909        let success = compile_jit_object("jit_duplicate_symbol");
910        let success2 = JitObjectSuccess {
911            symbol_name: success.symbol_name.clone(),
912            object_bytes: success.object_bytes.clone(),
913            builtin_symbols: success.builtin_symbols.clone(),
914        };
915
916        let mut linker = JitObjectLinker::new();
917        let (_first_func, _first_backing) = linker.link(&success).unwrap();
918        let (_second_func, _second_backing) = linker
919            .link(&success2)
920            .expect("same symbol should link while previous backing remains alive");
921    }
922}