1use crate::{
2 EvmCompilerFn, eyre,
3 runtime::{
4 LookupRequest,
5 api::{CompiledProgram, LoadedLibrary, ProgramKind},
6 config::{CompilationEvent, CompilationKind, RuntimeConfig, RuntimeTuning},
7 storage::{
8 ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeCacheKey,
9 },
10 worker::{AotSuccess, CompileJob, SyncNotifier, WorkerPool, WorkerResult, WorkerSuccess},
11 },
12};
13use alloy_primitives::{
14 Bytes, keccak256,
15 map::{DefaultHashBuilder, HashMap},
16};
17use crossbeam_channel as chan;
18use crossbeam_queue::ArrayQueue;
19use dashmap::DashMap;
20use quanta::Instant;
21use std::{
22 mem,
23 ops::ControlFlow,
24 sync::{Arc, atomic::Ordering},
25 time::{SystemTime, UNIX_EPOCH},
26};
27
28#[cfg(feature = "llvm")]
29use crate::llvm::jit_memory_usage;
30
31pub(crate) type ResidentMap = DashMap<RuntimeCacheKey, Arc<CompiledProgram>, DefaultHashBuilder>;
33
34pub(crate) type EventQueue = ArrayQueue<LookupRequest>;
40
41struct ResidentMeta {
43 last_hit_at: Instant,
45}
46
47fn jit_total_bytes() -> usize {
49 #[cfg(feature = "llvm")]
50 {
51 jit_memory_usage().map(|u| u.total_bytes()).unwrap_or(0)
52 }
53 #[cfg(not(feature = "llvm"))]
54 {
55 0
56 }
57}
58
59pub(crate) enum Command {
64 CompileJit(CompileJitRequest),
66 PrepareAot(Vec<PrepareAotRequest>),
68 ClearResident,
70 ClearPersisted,
72 ClearAll,
74 Shutdown,
76}
77
78pub(crate) struct CompileJitRequest {
80 pub(crate) key: RuntimeCacheKey,
82 pub(crate) bytecode: Bytes,
84 pub(crate) sync_notifier: SyncNotifier,
86}
87
88pub(crate) struct PrepareAotRequest {
90 pub(crate) key: RuntimeCacheKey,
92 pub(crate) bytecode: Bytes,
94}
95
96struct EntryState {
98 hotness: u32,
100 phase: EntryPhase,
102 bytecode: Bytes,
104 pending_notifiers: Vec<SyncNotifier>,
106}
107
108#[derive(Clone, Copy, Debug, PartialEq, Eq)]
110enum EntryPhase {
111 Cold,
113 Working,
115}
116
117#[derive(Clone, Copy, Debug, PartialEq, Eq)]
121enum AdmitMode {
122 Observed,
123 Explicit,
124}
125
126struct BackendState {
128 inner: Arc<super::BackendShared>,
130 resident_meta: HashMap<RuntimeCacheKey, ResidentMeta>,
132 entries: HashMap<RuntimeCacheKey, EntryState>,
134 workers: WorkerPool,
136 result_rx: chan::Receiver<WorkerResult>,
138 store: Option<Arc<dyn ArtifactStore>>,
140 tuning: RuntimeTuning,
142 aot: bool,
144 pending_jobs: usize,
146 generation: u64,
148 last_sweep: Instant,
150 on_compilation: Option<Arc<dyn Fn(CompilationEvent) + Send + Sync>>,
152}
153
154impl BackendState {
155 fn handle(&mut self, cmd: Command) -> ControlFlow<()> {
156 match cmd {
157 Command::CompileJit(req) => self.handle_compile_jit(req),
158 Command::PrepareAot(reqs) => self.handle_prepare_aot(reqs),
159 Command::ClearResident => self.handle_clear_resident(),
160 Command::ClearPersisted => self.handle_clear_persisted(),
161 Command::ClearAll => self.handle_clear_all(),
162 Command::Shutdown => return ControlFlow::Break(()),
163 }
164 ControlFlow::Continue(())
165 }
166
167 fn tick(&mut self) {
168 self.drain_events();
169 self.run_eviction_sweep();
170 }
171
172 fn drain_events(&mut self) {
174 for _ in 0..self.tuning.max_events_per_drain {
178 let Some(event) = self.inner.events.pop() else { break };
179 self.handle_lookup_observed(event);
180 }
181 }
182
183 fn handle_lookup_observed(&mut self, event: LookupRequest) {
184 let hit = event.code.is_empty();
185 if hit {
186 self.inner.stats.lookup_hits.fetch_add(1, Ordering::Relaxed);
187 if let Some(meta) = self.resident_meta.get_mut(&event.key) {
188 meta.last_hit_at = Instant::now();
189 }
190 } else {
191 self.inner.stats.lookup_misses.fetch_add(1, Ordering::Relaxed);
192 let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
193 self.try_admit(kind, event.key, event.code, SyncNotifier::none(), AdmitMode::Observed);
194 }
195 }
196
197 fn handle_compile_jit(&mut self, req: CompileJitRequest) {
198 let kind = if self.aot { CompilationKind::Aot } else { CompilationKind::Jit };
199 self.try_admit(kind, req.key, req.bytecode, req.sync_notifier, AdmitMode::Explicit);
200 }
201
202 fn handle_prepare_aot(&mut self, reqs: Vec<PrepareAotRequest>) {
203 for req in reqs {
204 self.try_admit(
205 CompilationKind::Aot,
206 req.key,
207 req.bytecode,
208 SyncNotifier::none(),
209 AdmitMode::Explicit,
210 );
211 }
212 }
213
214 fn try_admit(
220 &mut self,
221 kind: CompilationKind,
222 key: RuntimeCacheKey,
223 bytecode: Bytes,
224 sync_notifier: SyncNotifier,
225 mode: AdmitMode,
226 ) {
227 if self.inner.resident.contains_key(&key) {
228 sync_notifier.notify();
229 return;
230 }
231
232 if !self.tuning.should_compile(&bytecode) {
233 sync_notifier.notify();
234 return;
235 }
236
237 if matches!(mode, AdmitMode::Observed) {
238 let max_entries = self.tuning.jit_max_pending_jobs * 10;
239 if !self.entries.contains_key(&key) && self.entries.len() >= max_entries {
240 return;
241 }
242 }
243
244 if kind == CompilationKind::Aot && self.try_load_persisted_aot(&key) {
245 sync_notifier.notify();
246 return;
247 }
248
249 let entry = self.entries.entry(key).or_insert_with(|| EntryState {
250 hotness: 0,
251 phase: EntryPhase::Cold,
252 bytecode: bytecode.clone(),
253 pending_notifiers: Vec::new(),
254 });
255
256 if entry.phase == EntryPhase::Working {
257 entry.pending_notifiers.push(sync_notifier);
258 return;
259 }
260
261 if matches!(mode, AdmitMode::Observed) {
262 entry.hotness = entry.hotness.saturating_add(1);
263 if (entry.hotness as usize) < self.tuning.jit_hot_threshold {
264 return;
265 }
266 }
267
268 if self.pending_jobs >= self.tuning.jit_max_pending_jobs {
269 sync_notifier.notify();
270 return;
271 }
272
273 let prefix = match kind {
274 CompilationKind::Jit => "jit",
275 CompilationKind::Aot => "aot",
276 };
277 let opt_level = match kind {
278 CompilationKind::Jit => self.tuning.jit_opt_level,
279 CompilationKind::Aot => self.tuning.aot_opt_level,
280 };
281 let symbol = format!("{prefix}_{:x}_{:?}", key.code_hash, key.spec_id);
282 let job = CompileJob {
283 kind,
284 key,
285 bytecode: entry.bytecode.clone(),
286 symbol_name: symbol,
287 opt_level,
288 sync_notifier,
289 generation: self.generation,
290 };
291
292 match self.workers.try_send(job) {
293 Ok(()) => {
294 debug!(
295 code_hash = %key.code_hash,
296 spec_id = ?key.spec_id,
297 ?kind,
298 hotness = entry.hotness,
299 pending_jobs = self.pending_jobs + 1,
300 "dispatched compilation",
301 );
302 entry.phase = EntryPhase::Working;
303 self.pending_jobs += 1;
304 self.inner.stats.compilations_dispatched.fetch_add(1, Ordering::Relaxed);
305 }
306 Err(job) => {
307 warn!(code_hash = %key.code_hash, "worker pool saturated, dropping request");
308 job.sync_notifier.notify();
309 }
310 }
311 }
312
313 fn try_load_persisted_aot(&mut self, key: &RuntimeCacheKey) -> bool {
316 let store = match &self.store {
317 Some(s) => s,
318 None => return false,
319 };
320
321 let artifact_key = ArtifactKey {
322 runtime: *key,
323 backend: BackendSelection::Llvm,
324 opt_level: self.tuning.aot_opt_level,
325 };
326
327 match store.load(&artifact_key) {
328 Ok(Some(stored)) => {
329 match (|| -> eyre::Result<CompiledProgram> {
330 let library = unsafe { libloading::Library::new(&stored.dylib_path) }
331 .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
332 let func: EvmCompilerFn = unsafe {
333 let sym: libloading::Symbol<'_, EvmCompilerFn> =
334 library.get(stored.manifest.symbol_name.as_bytes()).map_err(|e| {
335 eyre::eyre!("symbol '{}': {e}", stored.manifest.symbol_name)
336 })?;
337 *sym
338 };
339 let library = Arc::new(LoadedLibrary::new(library));
340 Ok(CompiledProgram::new_aot(*key, func, library))
341 })() {
342 Ok(program) => {
343 debug!(
344 code_hash = %key.code_hash,
345 spec_id = ?key.spec_id,
346 "loaded existing AOT artifact from store, skipping recompilation",
347 );
348 self.insert_resident(*key, Arc::new(program));
349 true
350 }
351 Err(e) => {
352 warn!(
353 code_hash = %key.code_hash,
354 error = %e,
355 "failed to load persisted AOT artifact, will recompile",
356 );
357 false
358 }
359 }
360 }
361 Ok(None) => false,
362 Err(e) => {
363 warn!(
364 code_hash = %key.code_hash,
365 error = %e,
366 "failed to probe artifact store",
367 );
368 false
369 }
370 }
371 }
372
373 fn handle_clear_resident(&mut self) {
374 self.inner.resident.clear();
375 self.resident_meta.clear();
376 for (_, entry) in self.entries.drain() {
378 for n in entry.pending_notifiers {
379 n.notify();
380 }
381 }
382 while self.inner.events.pop().is_some() {}
385 self.generation += 1;
387 debug!(generation = self.generation, "resident map cleared");
388 }
389
390 fn handle_clear_persisted(&mut self) {
391 if let Some(store) = &self.store {
392 if let Err(e) = store.clear() {
393 warn!(error = %e, "failed to clear artifact store");
394 } else {
395 debug!("artifact store cleared");
396 }
397 }
398 }
399
400 fn handle_clear_all(&mut self) {
401 self.handle_clear_resident();
402 self.handle_clear_persisted();
403 }
404
405 fn insert_resident(&mut self, key: RuntimeCacheKey, program: Arc<CompiledProgram>) {
406 self.inner.resident.insert(key, program);
407 self.resident_meta.insert(key, ResidentMeta { last_hit_at: Instant::now() });
408 }
409
410 fn remove_resident(&mut self, key: &RuntimeCacheKey) {
411 self.inner.resident.remove(key);
412 self.resident_meta.remove(key);
413 }
414
415 fn handle_worker_result(&mut self, result: WorkerResult) {
416 self.pending_jobs = self.pending_jobs.saturating_sub(1);
417
418 let pending_notifiers = self
420 .entries
421 .get_mut(&result.key)
422 .map(|e| mem::take(&mut e.pending_notifiers))
423 .unwrap_or_default();
424
425 let notify = || {
426 result.sync_notifier.notify();
427 for n in pending_notifiers {
428 n.notify();
429 }
430 };
431
432 if result.generation != self.generation {
434 debug!(
435 code_hash = %result.key.code_hash,
436 result_gen = result.generation,
437 current_gen = self.generation,
438 "discarding stale worker result",
439 );
440 self.entries.remove(&result.key);
441 notify();
442 return;
443 }
444
445 let kind = result.kind;
446 let success = result.outcome.is_ok();
447
448 if let Some(cb) = &self.on_compilation {
449 cb(CompilationEvent {
450 code_hash: result.key.code_hash,
451 spec_id: result.key.spec_id,
452 duration: result.compile_duration,
453 kind,
454 success,
455 timings: result.timings,
456 });
457 }
458
459 match result.outcome {
460 Ok(WorkerSuccess::Jit(success)) => {
461 let program =
462 Arc::new(CompiledProgram::new_jit(result.key, success.func, success.backing));
463 self.insert_resident(result.key, program);
464 self.entries.remove(&result.key);
465 self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
466
467 debug!(
468 code_hash = %result.key.code_hash,
469 spec_id = ?result.key.spec_id,
470 compile_time = ?result.compile_duration,
471 "JIT program published to resident map",
472 );
473 }
474 Ok(WorkerSuccess::Aot(success)) => {
475 self.handle_aot_success(result.key, success);
476 }
477 Err(err) => {
478 self.entries.remove(&result.key);
479 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
480
481 warn!(
482 code_hash = %result.key.code_hash,
483 error = %err,
484 compile_time = ?result.compile_duration,
485 "compilation failed",
486 );
487 }
488 }
489
490 notify();
491 }
492
493 fn handle_aot_success(&mut self, key: RuntimeCacheKey, success: AotSuccess) {
494 let artifact_key = ArtifactKey {
495 runtime: key,
496 backend: BackendSelection::Llvm,
497 opt_level: self.tuning.aot_opt_level,
498 };
499
500 let content_hash = keccak256(&success.dylib_bytes).0;
501
502 let manifest = ArtifactManifest {
503 artifact_key: artifact_key.clone(),
504 symbol_name: success.symbol_name.clone(),
505 bytecode_len: success.bytecode_len,
506 artifact_len: success.dylib_bytes.len(),
507 created_at_unix_secs: SystemTime::now()
508 .duration_since(UNIX_EPOCH)
509 .map(|d| d.as_secs())
510 .unwrap_or(0),
511 content_hash,
512 };
513
514 if let Some(store) = &self.store {
516 if let Err(e) = store.store(&artifact_key, &manifest, &success.dylib_bytes) {
517 warn!(
518 code_hash = %key.code_hash,
519 error = %e,
520 "failed to persist AOT artifact",
521 );
522 self.entries.remove(&key);
523 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
524 return;
525 }
526
527 debug!(
528 code_hash = %key.code_hash,
529 spec_id = ?key.spec_id,
530 dylib_len = success.dylib_bytes.len(),
531 "AOT artifact persisted to store",
532 );
533
534 match store.load(&artifact_key) {
536 Ok(Some(stored)) => {
537 match (|| -> eyre::Result<CompiledProgram> {
538 let library = unsafe { libloading::Library::new(&stored.dylib_path) }
539 .map_err(|e| eyre::eyre!("dlopen {:?}: {e}", stored.dylib_path))?;
540 let func: EvmCompilerFn = unsafe {
541 let sym: libloading::Symbol<'_, EvmCompilerFn> =
542 library.get(success.symbol_name.as_bytes()).map_err(|e| {
543 eyre::eyre!("symbol '{}': {e}", success.symbol_name)
544 })?;
545 *sym
546 };
547 let library = Arc::new(LoadedLibrary::new(library));
548 Ok(CompiledProgram::new_aot(key, func, library))
549 })() {
550 Ok(program) => {
551 self.insert_resident(key, Arc::new(program));
552 self.entries.remove(&key);
553 self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed);
554
555 debug!(
556 code_hash = %key.code_hash,
557 spec_id = ?key.spec_id,
558 "AOT program loaded into resident map",
559 );
560 }
561 Err(e) => {
562 warn!(
563 code_hash = %key.code_hash,
564 error = %e,
565 "failed to load persisted AOT artifact",
566 );
567 self.entries.remove(&key);
569 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
570 }
571 }
572 }
573 Ok(None) => {
574 warn!(
575 code_hash = %key.code_hash,
576 "stored AOT artifact not found on reload",
577 );
578 self.entries.remove(&key);
579 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
580 }
581 Err(e) => {
582 warn!(
583 code_hash = %key.code_hash,
584 error = %e,
585 "failed to reload persisted AOT artifact",
586 );
587 self.entries.remove(&key);
588 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
589 }
590 }
591 } else {
592 warn!(
594 code_hash = %key.code_hash,
595 "AOT compilation completed but no artifact store configured",
596 );
597 self.entries.remove(&key);
598 self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed);
599 }
600 }
601
602 fn run_eviction_sweep(&mut self) {
604 if !self.should_sweep() {
605 return;
606 }
607
608 let now = Instant::now();
609 self.last_sweep = now;
610
611 let idle_duration = self.tuning.idle_evict_duration;
612 let budget = self.tuning.resident_code_cache_bytes;
613
614 if let Some(idle) = idle_duration {
616 let idle_keys: Vec<RuntimeCacheKey> = self
617 .resident_meta
618 .iter()
619 .filter(|(_, meta)| now.duration_since(meta.last_hit_at) > idle)
620 .map(|(key, _)| *key)
621 .collect();
622
623 for key in &idle_keys {
624 debug!(
625 code_hash = %key.code_hash,
626 spec_id = ?key.spec_id,
627 "evicting idle entry",
628 );
629 self.remove_resident(key);
630 self.entries.remove(key);
631 self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
632 }
633 }
634
635 if budget > 0 && jit_total_bytes() > budget {
637 let mut entries: Vec<(RuntimeCacheKey, Instant)> = self
640 .resident_meta
641 .iter()
642 .filter(|(key, _)| {
643 self.inner.resident.get(key).is_some_and(|p| matches!(p.kind, ProgramKind::Jit))
644 })
645 .map(|(key, meta)| (*key, meta.last_hit_at))
646 .collect();
647 entries.sort_by_key(|(_, t)| *t);
648
649 for (key, _) in entries {
650 if jit_total_bytes() <= budget {
651 break;
652 }
653 debug!(
654 code_hash = %key.code_hash,
655 spec_id = ?key.spec_id,
656 "evicting entry to stay within memory budget",
657 );
658 self.remove_resident(&key);
659 self.entries.remove(&key);
660 self.inner.stats.evictions.fetch_add(1, Ordering::Relaxed);
661 }
662 }
663 }
664
665 fn should_sweep(&self) -> bool {
667 let maxrss = self.tuning.resident_code_cache_bytes;
669 if maxrss > 0 && jit_total_bytes() > maxrss {
670 return true;
671 }
672 self.tuning.idle_evict_duration.is_some()
673 && self.last_sweep.elapsed() >= self.tuning.eviction_sweep_interval
674 }
675}
676
677pub(crate) fn run(
679 inner: Arc<super::BackendShared>,
680 cmd_rx: chan::Receiver<Command>,
681 config: RuntimeConfig,
682) {
683 debug!("backend thread started");
684
685 let (result_tx, result_rx) = chan::unbounded::<WorkerResult>();
686
687 let workers = WorkerPool::new(result_tx, config.clone());
688
689 let sweep_interval = config.tuning.eviction_sweep_interval;
690 let event_drain_interval = config.tuning.event_drain_interval;
691
692 let now = Instant::now();
694 let mut preload_meta = HashMap::default();
695 for entry in inner.resident.iter() {
696 preload_meta.insert(*entry.key(), ResidentMeta { last_hit_at: now });
697 }
698
699 let mut state = BackendState {
700 inner,
701 resident_meta: preload_meta,
702 entries: HashMap::default(),
703 workers,
704 result_rx,
705 store: config.store,
706 tuning: config.tuning,
707 aot: config.aot,
708 pending_jobs: 0,
709 generation: 0,
710 last_sweep: now,
711 on_compilation: config.on_compilation,
712 };
713
714 let tick = event_drain_interval.min(sweep_interval);
717 let shutdown_reason;
718
719 loop {
720 chan::select! {
721 recv(cmd_rx) -> msg => {
722 let Ok(cmd) = msg else {
723 shutdown_reason = "channel closed";
724 break;
725 };
726 if state.handle(cmd).is_break() {
727 shutdown_reason = "shutdown command";
728 break;
729 }
730 }
731 recv(state.result_rx) -> msg => {
732 match msg {
733 Ok(result) => state.handle_worker_result(result),
734 Err(_) => warn!("worker unexpectedly closed"),
735 }
736 }
737 default(tick) => {}
738 }
739 state.tick();
740 }
741
742 debug!(?shutdown_reason, stats = ?state.inner.stats(), "backend task shutting down");
743
744 state.workers.shutdown();
745 while state.result_rx.try_recv().is_ok() {}
746}