1use crate::{
9 EvmCompilerFn,
10 eyre::{self, WrapErr},
11};
12use api::LoadedLibrary;
13use backend::{Command, CompileJitRequest, EventQueue, PrepareAotRequest, ResidentMap};
14use crossbeam_channel as chan;
15use crossbeam_queue::ArrayQueue;
16use revm_primitives::{B256, hardfork::SpecId, hints_util::cold_path};
17use stats::RuntimeStats;
18use std::{
19 ops::ControlFlow,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicUsize, Ordering},
23 },
24 time::Duration,
25};
26use worker::SyncNotifier;
27
28mod api;
29pub use api::{
30 AotRequest, CompiledProgram, InterpretReason, LookupDecision, LookupRequest, ProgramKind,
31};
32
33mod config;
34pub use config::{CompilationEvent, CompilationKind, JitMode, RuntimeConfig, RuntimeTuning};
35
36mod backend;
37
38mod stats;
39pub use stats::RuntimeStatsSnapshot;
40
41mod storage;
42pub use storage::{
43 ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeArtifactStore,
44 RuntimeCacheKey, StoredArtifact,
45};
46
47#[cfg(all(feature = "llvm", unix))]
48mod out_of_process;
49
50mod worker;
51
52pub fn maybe_run_jit_helper() -> eyre::Result<ControlFlow<()>> {
58 #[cfg(all(feature = "llvm", unix))]
59 {
60 out_of_process::maybe_run_jit_helper()
61 }
62 #[cfg(not(all(feature = "llvm", unix)))]
63 {
64 if std::env::var_os("REVMC_JIT_HELPER").is_some() {
65 eyre::bail!("out-of-process JIT helper is only available on Unix with LLVM")
66 }
67 Ok(ControlFlow::Continue(()))
68 }
69}
70
71#[cfg(test)]
72mod tests;
73
74#[derive(derive_more::Debug)]
83pub(crate) struct BackendShared {
84 #[debug(skip)]
86 resident: ResidentMap,
87 #[debug(skip)]
89 events: EventQueue,
90 pause_depth: AtomicUsize,
92 #[debug(skip)]
94 stats: Arc<RuntimeStats>,
95}
96
97#[derive(derive_more::Debug)]
99pub(crate) struct BackendInner {
100 shared: Arc<BackendShared>,
102 enabled: AtomicBool,
104 started: AtomicBool,
106 blocking: bool,
108 tuning: crate::runtime::config::RuntimeTuning,
110 #[debug(skip)]
114 tx: chan::Sender<Command>,
115 #[debug(skip)]
117 thread: std::sync::Mutex<Option<BackendThread>>,
118 shutdown_timeout: Duration,
120 #[debug(skip)]
122 lazy_spawn: std::sync::Mutex<Option<LazySpawnState>>,
123}
124
125struct LazySpawnState {
127 rx: chan::Receiver<Command>,
128 config: RuntimeConfig,
129}
130
131struct BackendThread {
133 handle: std::thread::JoinHandle<()>,
134 done_rx: chan::Receiver<()>,
135}
136
137#[derive(Clone, Debug)]
143pub struct JitBackend {
144 inner: Arc<BackendInner>,
145}
146
147impl JitBackend {
148 pub fn disabled() -> Self {
154 Self::new_inner(RuntimeConfig::default()).expect("default config cannot fail")
155 }
156
157 pub fn new(mut config: RuntimeConfig) -> eyre::Result<Self> {
163 config = config.with_env_overrides()?;
164 Self::new_inner(config)
165 }
166
167 fn new_inner(mut config: RuntimeConfig) -> eyre::Result<Self> {
168 if config.blocking {
169 config.enabled = true;
170 config.tuning.jit_hot_threshold = 0;
171 }
172 #[cfg(not(unix))]
173 if config.jit_mode == JitMode::OutOfProcess {
174 eyre::bail!("out-of-process JIT is only available on Unix");
175 }
176
177 let enabled = config.enabled;
178 let (tx, rx) = chan::bounded::<Command>(config.tuning.channel_capacity);
179 let events = ArrayQueue::new(config.tuning.channel_capacity);
180 let tuning = config.tuning;
181 let shared = Arc::new(BackendShared {
182 resident: ResidentMap::default(),
183 events,
184 pause_depth: AtomicUsize::new(0),
185 stats: Arc::new(RuntimeStats::default()),
186 });
187 let this = Self {
188 inner: Arc::new(BackendInner {
189 shared,
190 enabled: AtomicBool::new(false),
191 started: AtomicBool::new(false),
192 blocking: config.blocking,
193 tx,
194 thread: std::sync::Mutex::new(None),
195 shutdown_timeout: config.tuning.shutdown_timeout,
196 tuning,
197 lazy_spawn: std::sync::Mutex::new(Some(LazySpawnState { rx, config })),
198 }),
199 };
200 this.set_enabled(enabled)?;
201 Ok(this)
202 }
203
204 pub fn lookup(&self, mut req: LookupRequest) -> LookupDecision {
209 let inner = &*self.inner;
210 let shared = &*inner.shared;
211
212 if !inner.enabled.load(Ordering::Relaxed) {
213 cold_path();
214 return LookupDecision::Interpret(InterpretReason::Disabled);
215 }
216 if inner.blocking {
217 cold_path();
218 return self.lookup_blocking(req);
219 }
220 if !inner.tuning.should_compile(&req.code) {
221 cold_path();
222 return LookupDecision::Interpret(InterpretReason::Ineligible);
223 }
224
225 let decision = if let Some(program_ref) = shared.resident.try_get(&req.key).try_unwrap() {
226 let program = Arc::clone(&program_ref);
227 drop(program_ref);
228 req.code.clear();
229 LookupDecision::Compiled(program)
230 } else {
231 LookupDecision::Interpret(InterpretReason::NotReady)
232 };
233
234 if let Err(_v) = shared.events.push(req) {
235 cold_path();
236 shared.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
237 }
238
239 decision
240 }
241
242 pub fn get_compiled(&self, code_hash: B256, spec_id: SpecId) -> Option<Arc<CompiledProgram>> {
244 let key = RuntimeCacheKey { code_hash, spec_id };
245 self.inner.shared.resident.get(&key).map(|entry| Arc::clone(&entry))
246 }
247
248 pub fn get_compiled_tracked(
250 &self,
251 code_hash: B256,
252 spec_id: SpecId,
253 ) -> Option<Arc<CompiledProgram>> {
254 let result = self.get_compiled(code_hash, spec_id);
255 if result.is_some() {
256 self.inner.shared.stats.lookup_hits.fetch_add(1, Ordering::Relaxed);
257 } else {
258 self.inner.shared.stats.lookup_misses.fetch_add(1, Ordering::Relaxed);
259 }
260 result
261 }
262
263 pub fn lookup_blocking(&self, req: LookupRequest) -> LookupDecision {
270 if !self.inner.tuning.should_compile(&req.code) {
271 return LookupDecision::Interpret(InterpretReason::Ineligible);
272 }
273 let code_hash = req.key.code_hash;
274 let spec_id = req.key.spec_id;
275 if let Some(program) = self.get_compiled_tracked(code_hash, spec_id) {
276 return LookupDecision::Compiled(program);
277 }
278 if self.compile_jit_sync(req).is_err() {
279 return LookupDecision::Interpret(InterpretReason::JitFailed);
280 }
281 match self.get_compiled_tracked(code_hash, spec_id) {
282 Some(program) => LookupDecision::Compiled(program),
283 None => LookupDecision::Interpret(InterpretReason::JitFailed),
284 }
285 }
286
287 pub fn compile_jit(&self, req: LookupRequest) {
291 let _ = self.ensure_started();
292 let cmd = Command::CompileJit(CompileJitRequest {
293 key: req.key,
294 bytecode: req.code,
295 sync_notifier: SyncNotifier::none(),
296 });
297 let _ = self.inner.tx.send(cmd);
298 }
299
300 pub fn compile_jit_sync(&self, req: LookupRequest) -> eyre::Result<()> {
306 self.ensure_started()?;
307 let (tx, rx) = chan::bounded(1);
308 let cmd = Command::CompileJit(CompileJitRequest {
309 key: req.key,
310 bytecode: req.code,
311 sync_notifier: SyncNotifier::new(tx),
312 });
313 self.inner.tx.send(cmd).map_err(|_| eyre::eyre!("backend channel closed"))?;
314 rx.recv().map_err(|_| eyre::eyre!("backend shut down before compilation completed"))
315 }
316
317 pub fn prepare_aot(&self, req: AotRequest) {
323 self.prepare_aot_batch(vec![req]);
324 }
325
326 pub fn prepare_aot_batch(&self, reqs: Vec<AotRequest>) {
330 let _ = self.ensure_started();
331 let owned: Vec<PrepareAotRequest> = reqs
332 .into_iter()
333 .map(|r| PrepareAotRequest {
334 key: RuntimeCacheKey { code_hash: r.code_hash, spec_id: r.spec_id },
335 bytecode: r.code,
336 })
337 .collect();
338 let cmd = Command::PrepareAot(owned);
339 let _ = self.inner.tx.send(cmd);
340 }
341
342 pub fn clear_resident(&self) {
347 let _ = self.inner.tx.send(Command::ClearResident);
348 }
349
350 pub fn clear_persisted(&self) {
352 let _ = self.inner.tx.send(Command::ClearPersisted);
353 }
354
355 pub fn clear_all(&self) {
357 let _ = self.inner.tx.send(Command::ClearAll);
358 }
359
360 pub fn enabled(&self) -> bool {
362 self.inner.enabled.load(Ordering::Relaxed)
363 }
364
365 pub fn pause(&self) {
377 if self.inner.shared.pause_depth.fetch_add(1, Ordering::Relaxed) == 0 {
378 self.try_send_control(Command::Pause);
379 }
380 }
381
382 pub fn resume(&self) {
386 if self
387 .inner
388 .shared
389 .pause_depth
390 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |depth| {
391 Some(depth.saturating_sub(1))
392 })
393 .is_ok_and(|depth| depth == 1)
394 {
395 self.try_send_control(Command::Resume);
396 }
397 }
398
399 fn try_send_control(&self, cmd: Command) {
412 if !self.inner.started.load(Ordering::Relaxed) {
413 return;
414 }
415 if self.inner.tx.try_send(cmd).is_err() {
416 self.inner.shared.stats.commands_dropped.fetch_add(1, Ordering::Relaxed);
417 }
418 }
419
420 pub fn is_paused(&self) -> bool {
422 self.inner.shared.pause_depth.load(Ordering::Relaxed) != 0
423 }
424
425 pub fn set_enabled(&self, enabled: bool) -> eyre::Result<()> {
430 debug!(enabled, "set_enabled");
431 self.inner.enabled.store(enabled, Ordering::Relaxed);
432 if enabled {
433 self.ensure_started()?;
434 }
435 Ok(())
436 }
437
438 pub fn stats(&self) -> RuntimeStatsSnapshot {
440 self.inner.stats()
441 }
442
443 fn ensure_started(&self) -> eyre::Result<()> {
445 let mut guard = self.inner.lazy_spawn.lock().unwrap();
446 let Some(lazy) = guard.take() else {
447 return Ok(());
448 };
449
450 let LazySpawnState { rx, config } = lazy;
451
452 debug!(
453 blocking = self.inner.blocking,
454 workers = config.tuning.jit_worker_count,
455 jit_mode = ?config.jit_mode,
456 hot_threshold = config.tuning.jit_hot_threshold,
457 channel_capacity = config.tuning.channel_capacity,
458 "spawning backend thread",
459 );
460
461 match Self::preload_aot(config.store.as_deref()) {
463 Ok(entries) => {
464 for (key, prog) in entries {
465 self.inner.shared.resident.insert(key, prog);
466 }
467 }
468 Err(e) => {
469 *guard = Some(LazySpawnState { rx, config });
471 return Err(e);
472 }
473 }
474
475 drop(guard);
476
477 let (done_tx, done_rx) = chan::bounded::<()>(1);
478 let shared = Arc::clone(&self.inner.shared);
479
480 let thread = std::thread::Builder::new()
481 .name(config.thread_name.clone())
482 .spawn(move || {
483 backend::run(shared, rx, config);
484 let _ = done_tx.send(());
485 })
486 .wrap_err("failed to spawn backend thread")?;
487
488 *self.inner.thread.lock().unwrap() = Some(BackendThread { handle: thread, done_rx });
489 self.inner.started.store(true, Ordering::Relaxed);
490 Ok(())
491 }
492
493 fn preload_aot(
495 store: Option<&dyn ArtifactStore>,
496 ) -> eyre::Result<Vec<(RuntimeCacheKey, Arc<CompiledProgram>)>> {
497 let Some(store) = store else {
498 debug!("no artifact store configured, skipping AOT preload");
499 return Ok(Vec::new());
500 };
501
502 let span = info_span!("aot_preload");
503 let _enter = span.enter();
504
505 let artifacts = store.load_all()?;
506 info!(count = artifacts.len(), "loading AOT artifacts");
507
508 let mut out: Vec<(RuntimeCacheKey, Arc<CompiledProgram>)> =
509 Vec::with_capacity(artifacts.len());
510 let mut seen = alloy_primitives::map::HashSet::<RuntimeCacheKey>::default();
511 let mut loaded = 0u64;
512 let mut failed = 0u64;
513
514 for (artifact_key, stored) in artifacts {
515 match Self::load_artifact(&artifact_key, &stored) {
516 Ok(program) => {
517 let key = artifact_key.runtime;
518 if !seen.insert(key) {
519 warn!(
520 code_hash = %key.code_hash,
521 spec_id = ?key.spec_id,
522 "duplicate artifact key, keeping first",
523 );
524 continue;
525 }
526 out.push((key, Arc::new(program)));
527 loaded += 1;
528 }
529 Err(e) => {
530 warn!(
531 code_hash = %artifact_key.runtime.code_hash,
532 error = %e,
533 "failed to load artifact, skipping",
534 );
535 failed += 1;
536 }
537 }
538 }
539
540 info!(loaded, failed, "AOT preload complete");
541 Ok(out)
542 }
543
544 fn load_artifact(key: &ArtifactKey, stored: &StoredArtifact) -> eyre::Result<CompiledProgram> {
546 let library = unsafe { libloading::Library::new(&stored.dylib_path) }
547 .wrap_err_with(|| format!("dlopen {:?}", stored.dylib_path))?;
548
549 let func: EvmCompilerFn = unsafe {
550 let sym: libloading::Symbol<'_, EvmCompilerFn> = library
551 .get(stored.manifest.symbol_name.as_bytes())
552 .wrap_err_with(|| format!("symbol '{}'", stored.manifest.symbol_name))?;
553 *sym
554 };
555
556 let library = Arc::new(LoadedLibrary::new(library));
557 Ok(CompiledProgram::new_aot(key.runtime, func, library))
558 }
559}
560
561impl BackendShared {
562 pub(crate) fn stats(&self) -> RuntimeStatsSnapshot {
567 self.stats.snapshot(stats::RuntimeStatsGauges {
568 resident_entries: self.resident.len() as u64,
569 events_queued: self.events.len() as u64,
570 command_queue_len: 0,
571 })
572 }
573}
574
575impl BackendInner {
576 pub(crate) fn stats(&self) -> RuntimeStatsSnapshot {
578 let mut snap = self.shared.stats();
579 snap.command_queue_len = self.tx.len() as u64;
580 snap
581 }
582
583 fn shutdown(&self) -> eyre::Result<()> {
584 debug!("shutting down JIT backend");
585 if let Some(ct) = self.thread.lock().unwrap().take() {
586 let _ = self.tx.send(Command::Shutdown);
588
589 match ct.done_rx.recv_timeout(self.shutdown_timeout) {
591 Ok(()) | Err(chan::RecvTimeoutError::Disconnected) => {}
592 Err(chan::RecvTimeoutError::Timeout) => {
593 eyre::bail!(
594 "backend thread did not exit within timeout ({:?})",
595 self.shutdown_timeout
596 );
597 }
598 }
599
600 ct.handle.join().map_err(|_| eyre::eyre!("backend thread panicked"))?;
602 }
603 Ok(())
604 }
605}
606
607impl Drop for BackendInner {
608 fn drop(&mut self) {
609 if let Err(err) = self.shutdown() {
610 warn!(%err, "failed to shutdown JIT backend");
611 }
612 }
613}