Skip to main content

revmc_runtime/runtime/
backend.rs

1use crate::{
2    EvmCompilerFn, eyre,
3    runtime::{
4        LookupRequest,
5        api::{CompiledProgram, LoadedLibrary, ProgramKind},
6        config::{CompilationEvent, CompilationKind, RuntimeConfig, RuntimeTuning},
7        storage::{
8            ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeCacheKey,
9        },
10        worker::{AotSuccess, CompileJob, SyncNotifier, WorkerPool, WorkerResult, WorkerSuccess},
11    },
12};
13use alloy_primitives::{
14    Bytes, keccak256,
15    map::{DefaultHashBuilder, HashMap},
16};
17use crossbeam_channel as chan;
18use crossbeam_queue::ArrayQueue;
19use dashmap::DashMap;
20use quanta::Instant;
21use std::{
22    mem,
23    ops::ControlFlow,
24    sync::{Arc, atomic::Ordering},
25    time::{SystemTime, UNIX_EPOCH},
26};
27
28#[cfg(feature = "llvm")]
29use crate::llvm::jit_memory_usage;
30
31/// The resident map type: code_hash+spec_id → compiled program.
32pub(crate) type ResidentMap = DashMap<RuntimeCacheKey, Arc<CompiledProgram>, DefaultHashBuilder>;
33
34/// Bounded MPMC lock-free queue of lookup-observed events.
35///
36/// Producers (lookup hot path) push without blocking; on overflow the event
37/// is silently dropped (`stats.events_dropped` is bumped). The backend
38/// drains via `pop` on every loop iteration. Hotness signal is best-effort.
39pub(crate) type EventQueue = ArrayQueue<LookupRequest>;
40
41/// Per-entry metadata tracked alongside the resident map for eviction decisions.
42struct ResidentMeta {
43    /// When this entry was last hit by a lookup.
44    last_hit_at: Instant,
45}
46
47/// Returns the total bytes of JIT-allocated memory via the memory plugin.
48fn jit_total_bytes() -> usize {
49    #[cfg(feature = "llvm")]
50    {
51        jit_memory_usage().map(|u| u.total_bytes()).unwrap_or(0)
52    }
53    #[cfg(not(feature = "llvm"))]
54    {
55        0
56    }
57}
58
59/// Commands sent to the backend thread on the bounded command channel.
60///
61/// Lookup-observed events are NOT carried here — they go through the
62/// [`EventQueue`] to avoid waking the backend on every lookup.
63pub(crate) enum Command {
64    /// Explicit request to JIT-compile a bytecode.
65    CompileJit(CompileJitRequest),
66    /// Explicit request to prepare AOT artifacts.
67    PrepareAot(Vec<PrepareAotRequest>),
68    /// Clear the resident compiled map.
69    ClearResident,
70    /// Clear persisted artifacts from the artifact store.
71    ClearPersisted,
72    /// Clear both resident and persisted.
73    ClearAll,
74    /// Shut down the backend.
75    Shutdown,
76}
77
78/// An explicit JIT compilation request.
79pub(crate) struct CompileJitRequest {
80    /// The key to compile for.
81    pub(crate) key: RuntimeCacheKey,
82    /// The raw bytecode.
83    pub(crate) bytecode: Bytes,
84    /// Optional notifier for synchronous callers.
85    pub(crate) sync_notifier: SyncNotifier,
86}
87
88/// An explicit AOT preparation request.
89pub(crate) struct PrepareAotRequest {
90    /// The key to compile for.
91    pub(crate) key: RuntimeCacheKey,
92    /// The raw bytecode.
93    pub(crate) bytecode: Bytes,
94}
95
96/// Per-key state tracked by the backend.
97struct EntryState {
98    /// Number of observed misses.
99    hotness: u32,
100    /// Current phase.
101    phase: EntryPhase,
102    /// The bytecode for this key (captured from a miss event).
103    bytecode: Bytes,
104    /// Sync notifiers waiting for this entry to finish compiling.
105    pending_notifiers: Vec<SyncNotifier>,
106}
107
108/// Phase of a backend entry.
109#[derive(Clone, Copy, Debug, PartialEq, Eq)]
110enum EntryPhase {
111    /// Not yet hot enough for JIT.
112    Cold,
113    /// JIT compilation in progress on a worker.
114    Working,
115}
116
117/// Whether a JIT admission request was triggered by hot-path observation
118/// (gated on hotness + cold-entry cap) or by an explicit user request
119/// (unconditional, may carry a sync notifier).
120#[derive(Clone, Copy, Debug, PartialEq, Eq)]
121enum AdmitMode {
122    Observed,
123    Explicit,
124}
125
126/// All backend-thread-owned mutable state.
127struct BackendState {
128    /// Shared state (resident map, event queue, stats).
129    inner: Arc<super::BackendShared>,
130    /// Per-key metadata for eviction (backend-only).
131    resident_meta: HashMap<RuntimeCacheKey, ResidentMeta>,
132    /// Per-key tracking state (backend-only).
133    entries: HashMap<RuntimeCacheKey, EntryState>,
134    /// Worker pool for JIT compilation.
135    workers: WorkerPool,
136    /// Receiver for worker results.
137    result_rx: chan::Receiver<WorkerResult>,
138    /// Artifact store for persisted artifacts.
139    store: Option<Arc<dyn ArtifactStore>>,
140    /// Tuning knobs.
141    tuning: RuntimeTuning,
142    /// Whether observed misses compile AOT artifacts instead of JIT code.
143    aot: bool,
144    /// Number of keys currently in Working phase.
145    pending_jobs: usize,
146    /// Monotonically increasing generation counter, bumped on clear/invalidation.
147    generation: u64,
148    /// Last time an eviction sweep was run.
149    last_sweep: Instant,
150    /// Optional user callback for compilation events.
151    on_compilation: Option<Arc<dyn Fn(CompilationEvent) + Send + Sync>>,
152}
153
154impl BackendState {
155    fn handle(&mut self, cmd: Command) -> ControlFlow<()> {
156        match cmd {
157            Command::CompileJit(req) => self.handle_compile_jit(req),
158            Command::PrepareAot(reqs) => self.handle_prepare_aot(reqs),
159            Command::ClearResident => self.handle_clear_resident(),
160            Command::ClearPersisted => self.handle_clear_persisted(),
161            Command::ClearAll => self.handle_clear_all(),
162            Command::Shutdown => return ControlFlow::Break(()),
163        }
164        ControlFlow::Continue(())
165    }
166
167    fn tick(&mut self) {
168        self.drain_events();
169        self.run_eviction_sweep();
170    }
171
172    /// Drains all currently-queued lookup events.
173    fn drain_events(&mut self) {
174        // Cap per-iteration drain so a flood of events can't starve other
175        // work (commands, worker results, sweeps). Surplus events stay in the
176        // queue and are picked up next iteration.
177        for _ in 0..self.tuning.max_events_per_drain {
178            let Some(event) = self.inner.events.pop() else { break };
179            self.handle_lookup_observed(event);
180        }
181    }
182
183    fn handle_lookup_observed(&mut self, event: LookupRequest) {
184        let hit = event.code.is_empty();
185        if hit {
186            self.inner.stats.lookup_hits.fetch_add(1, Ordering::Relaxed);
187            if let Some(meta) = self.resident_meta.get_mut(&event.key) {
188                meta.last_hit_at = Instant::now();
189            }
190        } else {
191            self.inner.stats.lookup_misses.fetch_add(1, Ordering::Relaxed);
192            let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
193            self.try_admit(kind, event.key, event.code, SyncNotifier::none(), AdmitMode::Observed);
194        }
195    }
196
197    fn handle_compile_jit(&mut self, req: CompileJitRequest) {
198        let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
199        self.try_admit(kind, req.key, req.bytecode, req.sync_notifier, AdmitMode::Explicit);
200    }
201
202    fn handle_prepare_aot(&mut self, reqs: Vec<PrepareAotRequest>) {
203        for req in reqs {
204            self.try_admit(
205                CompilationKind::Aot,
206                req.key,
207                req.bytecode,
208                SyncNotifier::none(),
209                AdmitMode::Explicit,
210            );
211        }
212    }
213
214    /// Common admission path for JIT and AOT compilation requests.
215    ///
216    /// Handles the cold→working state machine, hotness gating, in-flight
217    /// dedup, persisted AOT probing, and worker dispatch. Observed promotion
218    /// is gated by hotness; explicit requests are unconditional.
219    fn try_admit(
220        &mut self,
221        kind: CompilationKind,
222        key: RuntimeCacheKey,
223        bytecode: Bytes,
224        sync_notifier: SyncNotifier,
225        mode: AdmitMode,
226    ) {
227        if self.inner.resident.contains_key(&key) {
228            sync_notifier.notify();
229            return;
230        }
231
232        if !self.tuning.should_compile(&bytecode) {
233            sync_notifier.notify();
234            return;
235        }
236
237        if matches!(mode, AdmitMode::Observed) {
238            let max_entries = self.tuning.jit_max_pending_jobs * 10;
239            if !self.entries.contains_key(&key) && self.entries.len() >= max_entries {
240                return;
241            }
242        }
243
244        if kind == CompilationKind::Aot && self.try_load_persisted_aot(&key) {
245            sync_notifier.notify();
246            return;
247        }
248
249        let entry = self.entries.entry(key).or_insert_with(|| EntryState {
250            hotness: 0,
251            phase: EntryPhase::Cold,
252            bytecode: bytecode.clone(),
253            pending_notifiers: Vec::new(),
254        });
255
256        if entry.phase == EntryPhase::Working {
257            entry.pending_notifiers.push(sync_notifier);
258            return;
259        }
260
261        if matches!(mode, AdmitMode::Observed) {
262            entry.hotness = entry.hotness.saturating_add(1);
263            if (entry.hotness as usize) < self.tuning.jit_hot_threshold {
264                return;
265            }
266        }
267
268        if self.pending_jobs >= self.tuning.jit_max_pending_jobs {
269            sync_notifier.notify();
270            return;
271        }
272
273        let prefix = match kind {
274            CompilationKind::Jit => "jit",
275            CompilationKind::Aot => "aot",
276        };
277        let opt_level = match kind {
278            CompilationKind::Jit => self.tuning.jit_opt_level,
279            CompilationKind::Aot => self.tuning.aot_opt_level,
280        };
281        let symbol = format!("{prefix}_{:x}_{:?}", key.code_hash, key.spec_id);
282        let job = CompileJob {
283            kind,
284            key,
285            bytecode: entry.bytecode.clone(),
286            symbol_name: symbol,
287            opt_level,
288            sync_notifier,
289            generation: self.generation,
290        };
291
292        match self.workers.try_send(job) {
293            Ok(()) => {
294                debug!(
295                    code_hash = %key.code_hash,
296                    spec_id = ?key.spec_id,
297                    ?kind,
298                    hotness = entry.hotness,
299                    pending_jobs = self.pending_jobs + 1,
300                    "dispatched compilation",
301                );
302                entry.phase = EntryPhase::Working;
303                self.pending_jobs += 1;
304                self.inner.stats.compilations_dispatched.fetch_add(1, Ordering::Relaxed);
305            }
306            Err(job) => {
307                warn!(code_hash = %key.code_hash, "worker pool saturated, dropping request");
308                job.sync_notifier.notify();
309            }
310        }
311    }
312
313    /// Tries to load an already-persisted AOT artifact from the store into the resident map.
314    /// Returns `true` if the artifact was loaded successfully.
315    fn try_load_persisted_aot(&mut self, key: &RuntimeCacheKey) -> bool {
316        let store = match &self.store {
317            Some(s) => s,
318            None => return false,
319        };
320
321        let artifact_key = ArtifactKey {
322            runtime: *key,
323            backend: BackendSelection::Llvm,
324            opt_level: self.tuning.aot_opt_level,
325        };
326
327        match store.load(&artifact_key) {
328            Ok(Some(stored)) => {
329                match (|| -> eyre::Result<CompiledProgram> {
330                    let library = unsafe { libloading::Library::new(&stored.dylib_path) }
331                        .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
332                    let func: EvmCompilerFn = unsafe {
333                        let sym: libloading::Symbol<'_, EvmCompilerFn> =
334                            library.get(stored.manifest.symbol_name.as_bytes()).map_err(|e| {
335                                eyre::eyre!("symbol '{}': {e}", stored.manifest.symbol_name)
336                            })?;
337                        *sym
338                    };
339                    let library = Arc::new(LoadedLibrary::new(library));
340                    Ok(CompiledProgram::new_aot(*key, func, library))
341                })() {
342                    Ok(program) => {
343                        debug!(
344                            code_hash = %key.code_hash,
345                            spec_id = ?key.spec_id,
346                            "loaded existing AOT artifact from store, skipping recompilation",
347                        );
348                        self.insert_resident(*key, Arc::new(program));
349                        true
350                    }
351                    Err(e) => {
352                        warn!(
353                            code_hash = %key.code_hash,
354                            error = %e,
355                            "failed to load persisted AOT artifact, will recompile",
356                        );
357                        false
358                    }
359                }
360            }
361            Ok(None) => false,
362            Err(e) => {
363                warn!(
364                    code_hash = %key.code_hash,
365                    error = %e,
366                    "failed to probe artifact store",
367                );
368                false
369            }
370        }
371    }
372
373    fn handle_clear_resident(&mut self) {
374        self.inner.resident.clear();
375        self.resident_meta.clear();
376        // Notify any pending sync callers before clearing entries.
377        for (_, entry) in self.entries.drain() {
378            for n in entry.pending_notifiers {
379                n.notify();
380            }
381        }
382        // Discard pending lookup events: they were observed before the clear
383        // and would otherwise get processed against the new generation.
384        while self.inner.events.pop().is_some() {}
385        // Bump generation so in-flight worker results from before the clear are discarded.
386        self.generation += 1;
387        debug!(generation = self.generation, "resident map cleared");
388    }
389
390    fn handle_clear_persisted(&mut self) {
391        if let Some(store) = &self.store {
392            if let Err(e) = store.clear() {
393                warn!(error = %e, "failed to clear artifact store");
394            } else {
395                debug!("artifact store cleared");
396            }
397        }
398    }
399
400    fn handle_clear_all(&mut self) {
401        self.handle_clear_resident();
402        self.handle_clear_persisted();
403    }
404
405    fn insert_resident(&mut self, key: RuntimeCacheKey, program: Arc<CompiledProgram>) {
406        self.inner.resident.insert(key, program);
407        self.resident_meta.insert(key, ResidentMeta { last_hit_at: Instant::now() });
408    }
409
410    fn remove_resident(&mut self, key: &RuntimeCacheKey) {
411        self.inner.resident.remove(key);
412        self.resident_meta.remove(key);
413    }
414
415    fn handle_worker_result(&mut self, result: WorkerResult) {
416        self.pending_jobs = self.pending_jobs.saturating_sub(1);
417
418        // Drain pending notifiers from the entry before processing.
419        let pending_notifiers = self
420            .entries
421            .get_mut(&result.key)
422            .map(|e| mem::take(&mut e.pending_notifiers))
423            .unwrap_or_default();
424
425        let notify = || {
426            result.sync_notifier.notify();
427            for n in pending_notifiers {
428                n.notify();
429            }
430        };
431
432        // Discard stale results from a previous generation (e.g. after clear).
433        if result.generation != self.generation {
434            debug!(
435                code_hash = %result.key.code_hash,
436                result_gen = result.generation,
437                current_gen = self.generation,
438                "discarding stale worker result",
439            );
440            self.entries.remove(&result.key);
441            notify();
442            return;
443        }
444
445        let kind = result.kind;
446        let success = result.outcome.is_ok();
447
448        if let Some(cb) = &self.on_compilation {
449            cb(CompilationEvent {
450                code_hash: result.key.code_hash,
451                spec_id: result.key.spec_id,
452                duration: result.compile_duration,
453                kind,
454                success,
455                timings: result.timings,
456            });
457        }
458
459        match result.outcome {
460            Ok(WorkerSuccess::Jit(success)) => {
461                let program =
462                    Arc::new(CompiledProgram::new_jit(result.key, success.func, success.backing));
463                self.insert_resident(result.key, program);
464                self.entries.remove(&result.key);
465                self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
466
467                debug!(
468                    code_hash = %result.key.code_hash,
469                    spec_id = ?result.key.spec_id,
470                    compile_time = ?result.compile_duration,
471                    "JIT program published to resident map",
472                );
473            }
474            Ok(WorkerSuccess::Aot(success)) => {
475                self.handle_aot_success(result.key, success);
476            }
477            Err(err) => {
478                self.entries.remove(&result.key);
479                self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
480
481                warn!(
482                    code_hash = %result.key.code_hash,
483                    error = %err,
484                    compile_time = ?result.compile_duration,
485                    "compilation failed",
486                );
487            }
488        }
489
490        notify();
491    }
492
493    fn handle_aot_success(&mut self, key: RuntimeCacheKey, success: AotSuccess) {
494        let artifact_key = ArtifactKey {
495            runtime: key,
496            backend: BackendSelection::Llvm,
497            opt_level: self.tuning.aot_opt_level,
498        };
499
500        let content_hash = keccak256(&success.dylib_bytes).0;
501
502        let manifest = ArtifactManifest {
503            artifact_key: artifact_key.clone(),
504            symbol_name: success.symbol_name.clone(),
505            bytecode_len: success.bytecode_len,
506            artifact_len: success.dylib_bytes.len(),
507            created_at_unix_secs: SystemTime::now()
508                .duration_since(UNIX_EPOCH)
509                .map(|d| d.as_secs())
510                .unwrap_or(0),
511            content_hash,
512        };
513
514        // Persist to store if available.
515        if let Some(store) = &self.store {
516            if let Err(e) = store.store(&artifact_key, &manifest, &success.dylib_bytes) {
517                warn!(
518                    code_hash = %key.code_hash,
519                    error = %e,
520                    "failed to persist AOT artifact",
521                );
522                self.entries.remove(&key);
523                self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
524                return;
525            }
526
527            debug!(
528                code_hash = %key.code_hash,
529                spec_id = ?key.spec_id,
530                dylib_len = success.dylib_bytes.len(),
531                "AOT artifact persisted to store",
532            );
533
534            // Load from store to get the canonical path, then dlopen.
535            match store.load(&artifact_key) {
536                Ok(Some(stored)) => {
537                    match (|| -> eyre::Result<CompiledProgram> {
538                        let library = unsafe { libloading::Library::new(&stored.dylib_path) }
539                            .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
540                        let func: EvmCompilerFn = unsafe {
541                            let sym: libloading::Symbol<'_, EvmCompilerFn> =
542                                library.get(success.symbol_name.as_bytes()).map_err(|e| {
543                                    eyre::eyre!("symbol '{}': {e}", success.symbol_name)
544                                })?;
545                            *sym
546                        };
547                        let library = Arc::new(LoadedLibrary::new(library));
548                        Ok(CompiledProgram::new_aot(key, func, library))
549                    })() {
550                        Ok(program) => {
551                            self.insert_resident(key, Arc::new(program));
552                            self.entries.remove(&key);
553                            self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
554
555                            debug!(
556                                code_hash = %key.code_hash,
557                                spec_id = ?key.spec_id,
558                                "AOT program loaded into resident map",
559                            );
560                        }
561                        Err(e) => {
562                            warn!(
563                                code_hash = %key.code_hash,
564                                error = %e,
565                                "failed to load persisted AOT artifact",
566                            );
567                            // Persisted successfully but couldn't load — remove so JIT can retry.
568                            self.entries.remove(&key);
569                            self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
570                        }
571                    }
572                }
573                Ok(None) => {
574                    warn!(
575                        code_hash = %key.code_hash,
576                        "stored AOT artifact not found on reload",
577                    );
578                    self.entries.remove(&key);
579                    self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
580                }
581                Err(e) => {
582                    warn!(
583                        code_hash = %key.code_hash,
584                        error = %e,
585                        "failed to reload persisted AOT artifact",
586                    );
587                    self.entries.remove(&key);
588                    self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
589                }
590            }
591        } else {
592            // No store configured — can't persist, remove so JIT can retry.
593            warn!(
594                code_hash = %key.code_hash,
595                "AOT compilation completed but no artifact store configured",
596            );
597            self.entries.remove(&key);
598            self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
599        }
600    }
601
602    /// Runs an eviction sweep: removes idle entries and enforces the memory budget.
603    fn run_eviction_sweep(&mut self) {
604        if !self.should_sweep() {
605            return;
606        }
607
608        let now = Instant::now();
609        self.last_sweep = now;
610
611        let idle_duration = self.tuning.idle_evict_duration;
612        let budget = self.tuning.resident_code_cache_bytes;
613
614        // Phase 1: evict idle entries.
615        if let Some(idle) = idle_duration {
616            let idle_keys: Vec<RuntimeCacheKey> = self
617                .resident_meta
618                .iter()
619                .filter(|(_, meta)| now.duration_since(meta.last_hit_at) > idle)
620                .map(|(key, _)| *key)
621                .collect();
622
623            for key in &idle_keys {
624                debug!(
625                    code_hash = %key.code_hash,
626                    spec_id = ?key.spec_id,
627                    "evicting idle entry",
628                );
629                self.remove_resident(key);
630                self.entries.remove(key);
631                self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
632            }
633        }
634
635        // Phase 2: enforce memory budget by evicting LRU JIT entries.
636        if budget > 0 && jit_total_bytes() > budget {
637            // Collect JIT entries sorted by last_hit_at ascending (oldest first).
638            // AOT entries are excluded because they don't contribute to `jit_total_bytes()`.
639            let mut entries: Vec<(RuntimeCacheKey, Instant)> = self
640                .resident_meta
641                .iter()
642                .filter(|(key, _)| {
643                    self.inner.resident.get(key).is_some_and(|p| matches!(p.kind, ProgramKind::Jit))
644                })
645                .map(|(key, meta)| (*key, meta.last_hit_at))
646                .collect();
647            entries.sort_by_key(|(_, t)| *t);
648
649            for (key, _) in entries {
650                if jit_total_bytes() <= budget {
651                    break;
652                }
653                debug!(
654                    code_hash = %key.code_hash,
655                    spec_id = ?key.spec_id,
656                    "evicting entry to stay within memory budget",
657                );
658                self.remove_resident(&key);
659                self.entries.remove(&key);
660                self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
661            }
662        }
663    }
664
665    /// Returns whether eviction is configured and a sweep is due.
666    fn should_sweep(&self) -> bool {
667        // Over budget — sweep immediately regardless of interval.
668        let maxrss = self.tuning.resident_code_cache_bytes;
669        if maxrss > 0 && jit_total_bytes() > maxrss {
670            return true;
671        }
672        self.tuning.idle_evict_duration.is_some()
673            && self.last_sweep.elapsed() >= self.tuning.eviction_sweep_interval
674    }
675}
676
677/// Runs the backend event loop. Called on the backend thread.
678pub(crate) fn run(
679    inner: Arc<super::BackendShared>,
680    cmd_rx: chan::Receiver<Command>,
681    config: RuntimeConfig,
682) {
683    debug!("backend thread started");
684
685    let (result_tx, result_rx) = chan::unbounded::<WorkerResult>();
686
687    let workers = WorkerPool::new(result_tx, config.clone());
688
689    let sweep_interval = config.tuning.eviction_sweep_interval;
690    let event_drain_interval = config.tuning.event_drain_interval;
691
692    // Seed resident metadata from startup-preloaded AOT entries.
693    let now = Instant::now();
694    let mut preload_meta = HashMap::default();
695    for entry in inner.resident.iter() {
696        preload_meta.insert(*entry.key(), ResidentMeta { last_hit_at: now });
697    }
698
699    let mut state = BackendState {
700        inner,
701        resident_meta: preload_meta,
702        entries: HashMap::default(),
703        workers,
704        result_rx,
705        store: config.store,
706        tuning: config.tuning,
707        aot: config.aot,
708        pending_jobs: 0,
709        generation: 0,
710        last_sweep: now,
711        on_compilation: config.on_compilation,
712    };
713
714    // Tick interval is min(event_drain, sweep) so we never sleep longer than
715    // either. Events are drained on every wakeup regardless of cause.
716    let tick = event_drain_interval.min(sweep_interval);
717    let shutdown_reason;
718
719    loop {
720        chan::select! {
721            recv(cmd_rx) -> msg => {
722                let Ok(cmd) = msg else {
723                    shutdown_reason = "channel closed";
724                    break;
725                };
726                if state.handle(cmd).is_break() {
727                    shutdown_reason = "shutdown command";
728                    break;
729                }
730            }
731            recv(state.result_rx) -> msg => {
732                match msg {
733                    Ok(result) => state.handle_worker_result(result),
734                    Err(_) => warn!("worker unexpectedly closed"),
735                }
736            }
737            default(tick) => {}
738        }
739        state.tick();
740    }
741
742    debug!(?shutdown_reason, stats = ?state.inner.stats(), "backend task shutting down");
743
744    state.workers.shutdown();
745    while state.result_rx.try_recv().is_ok() {}
746}