1use crate::{
9 EvmCompilerFn,
10 eyre::{self, WrapErr},
11};
12use api::LoadedLibrary;
13use backend::{Command, CompileJitRequest, EventQueue, PrepareAotRequest, ResidentMap};
14use crossbeam_channel as chan;
15use crossbeam_queue::ArrayQueue;
16use revm_primitives::{B256, hardfork::SpecId, hints_util::cold_path};
17use stats::RuntimeStats;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use worker::SyncNotifier;
26
27mod api;
28pub use api::{
29 AotRequest, CompiledProgram, InterpretReason, LookupDecision, LookupRequest, ProgramKind,
30};
31
32mod config;
33pub use config::{CompilationEvent, CompilationKind, RuntimeConfig, RuntimeTuning};
34
35mod backend;
36
37mod stats;
38pub use stats::RuntimeStatsSnapshot;
39
40mod storage;
41pub use storage::{
42 ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeArtifactStore,
43 RuntimeCacheKey, StoredArtifact,
44};
45
46mod worker;
47
48#[cfg(test)]
49mod tests;
50
51#[derive(derive_more::Debug)]
60pub(crate) struct BackendShared {
61 #[debug(skip)]
63 resident: ResidentMap,
64 #[debug(skip)]
66 events: EventQueue,
67 #[debug(skip)]
69 stats: RuntimeStats,
70}
71
72#[derive(derive_more::Debug)]
74pub(crate) struct BackendInner {
75 shared: Arc<BackendShared>,
77 enabled: AtomicBool,
79 blocking: bool,
81 tuning: crate::runtime::config::RuntimeTuning,
83 #[debug(skip)]
87 tx: chan::Sender<Command>,
88 #[debug(skip)]
90 thread: std::sync::Mutex<Option<BackendThread>>,
91 shutdown_timeout: Duration,
93 #[debug(skip)]
95 lazy_spawn: std::sync::Mutex<Option<LazySpawnState>>,
96}
97
98struct LazySpawnState {
100 rx: chan::Receiver<Command>,
101 config: RuntimeConfig,
102}
103
104struct BackendThread {
106 handle: std::thread::JoinHandle<()>,
107 done_rx: chan::Receiver<()>,
108}
109
110#[derive(Clone, Debug)]
116pub struct JitBackend {
117 inner: Arc<BackendInner>,
118}
119
120impl JitBackend {
121 pub fn disabled() -> Self {
127 Self::new(RuntimeConfig::default()).expect("default config cannot fail")
128 }
129
130 pub fn new(mut config: RuntimeConfig) -> eyre::Result<Self> {
136 if config.blocking {
137 config.enabled = true;
138 config.tuning.jit_hot_threshold = 0;
139 }
140
141 let enabled = config.enabled;
142 let (tx, rx) = chan::bounded::<Command>(config.tuning.channel_capacity);
143 let events = ArrayQueue::new(config.tuning.channel_capacity);
144 let tuning = config.tuning;
145 let shared = Arc::new(BackendShared {
146 resident: ResidentMap::default(),
147 events,
148 stats: RuntimeStats::default(),
149 });
150 let this = Self {
151 inner: Arc::new(BackendInner {
152 shared,
153 enabled: AtomicBool::new(false),
154 blocking: config.blocking,
155 tx,
156 thread: std::sync::Mutex::new(None),
157 shutdown_timeout: config.tuning.shutdown_timeout,
158 tuning,
159 lazy_spawn: std::sync::Mutex::new(Some(LazySpawnState { rx, config })),
160 }),
161 };
162 this.set_enabled(enabled)?;
163 Ok(this)
164 }
165
166 pub fn lookup(&self, mut req: LookupRequest) -> LookupDecision {
171 let inner = &*self.inner;
172 let shared = &*inner.shared;
173
174 if !inner.enabled.load(Ordering::Relaxed) {
175 cold_path();
176 return LookupDecision::Interpret(InterpretReason::Disabled);
177 }
178 if inner.blocking {
179 cold_path();
180 return self.lookup_blocking(req);
181 }
182 if !inner.tuning.should_compile(&req.code) {
183 cold_path();
184 return LookupDecision::Interpret(InterpretReason::Ineligible);
185 }
186
187 let decision = if let Some(program_ref) = shared.resident.try_get(&req.key).try_unwrap() {
188 let program = Arc::clone(&program_ref);
189 drop(program_ref);
190 req.code.clear();
191 LookupDecision::Compiled(program)
192 } else {
193 LookupDecision::Interpret(InterpretReason::NotReady)
194 };
195
196 if let Err(_v) = shared.events.push(req) {
197 cold_path();
198 shared.stats.events_dropped.fetch_add(1, Ordering::Relaxed);
199 }
200
201 decision
202 }
203
204 pub fn get_compiled(&self, code_hash: B256, spec_id: SpecId) -> Option<Arc<CompiledProgram>> {
206 let key = RuntimeCacheKey { code_hash, spec_id };
207 self.inner.shared.resident.get(&key).map(|entry| Arc::clone(&entry))
208 }
209
210 pub fn get_compiled_tracked(
212 &self,
213 code_hash: B256,
214 spec_id: SpecId,
215 ) -> Option<Arc<CompiledProgram>> {
216 let result = self.get_compiled(code_hash, spec_id);
217 if result.is_some() {
218 self.inner.shared.stats.lookup_hits.fetch_add(1, Ordering::Relaxed);
219 } else {
220 self.inner.shared.stats.lookup_misses.fetch_add(1, Ordering::Relaxed);
221 }
222 result
223 }
224
225 pub fn lookup_blocking(&self, req: LookupRequest) -> LookupDecision {
232 if !self.inner.tuning.should_compile(&req.code) {
233 return LookupDecision::Interpret(InterpretReason::Ineligible);
234 }
235 let code_hash = req.key.code_hash;
236 let spec_id = req.key.spec_id;
237 if let Some(program) = self.get_compiled_tracked(code_hash, spec_id) {
238 return LookupDecision::Compiled(program);
239 }
240 if self.compile_jit_sync(req).is_err() {
241 return LookupDecision::Interpret(InterpretReason::JitFailed);
242 }
243 match self.get_compiled_tracked(code_hash, spec_id) {
244 Some(program) => LookupDecision::Compiled(program),
245 None => LookupDecision::Interpret(InterpretReason::JitFailed),
246 }
247 }
248
249 pub fn compile_jit(&self, req: LookupRequest) {
253 let _ = self.ensure_started();
254 let cmd = Command::CompileJit(CompileJitRequest {
255 key: req.key,
256 bytecode: req.code,
257 sync_notifier: SyncNotifier::none(),
258 });
259 let _ = self.inner.tx.send(cmd);
260 }
261
262 pub fn compile_jit_sync(&self, req: LookupRequest) -> eyre::Result<()> {
268 self.ensure_started()?;
269 let (tx, rx) = chan::bounded(1);
270 let cmd = Command::CompileJit(CompileJitRequest {
271 key: req.key,
272 bytecode: req.code,
273 sync_notifier: SyncNotifier::new(tx),
274 });
275 self.inner.tx.send(cmd).map_err(|_| eyre::eyre!("backend channel closed"))?;
276 rx.recv().map_err(|_| eyre::eyre!("backend shut down before compilation completed"))
277 }
278
279 pub fn prepare_aot(&self, req: AotRequest) {
285 self.prepare_aot_batch(vec![req]);
286 }
287
288 pub fn prepare_aot_batch(&self, reqs: Vec<AotRequest>) {
292 let _ = self.ensure_started();
293 let owned: Vec<PrepareAotRequest> = reqs
294 .into_iter()
295 .map(|r| PrepareAotRequest {
296 key: RuntimeCacheKey { code_hash: r.code_hash, spec_id: r.spec_id },
297 bytecode: r.code,
298 })
299 .collect();
300 let cmd = Command::PrepareAot(owned);
301 let _ = self.inner.tx.send(cmd);
302 }
303
304 pub fn clear_resident(&self) {
309 let _ = self.inner.tx.send(Command::ClearResident);
310 }
311
312 pub fn clear_persisted(&self) {
314 let _ = self.inner.tx.send(Command::ClearPersisted);
315 }
316
317 pub fn clear_all(&self) {
319 let _ = self.inner.tx.send(Command::ClearAll);
320 }
321
322 pub fn enabled(&self) -> bool {
324 self.inner.enabled.load(Ordering::Relaxed)
325 }
326
327 pub fn set_enabled(&self, enabled: bool) -> eyre::Result<()> {
332 debug!(enabled, "set_enabled");
333 self.inner.enabled.store(enabled, Ordering::Relaxed);
334 if enabled {
335 self.ensure_started()?;
336 }
337 Ok(())
338 }
339
340 pub fn stats(&self) -> RuntimeStatsSnapshot {
342 self.inner.stats()
343 }
344
345 fn ensure_started(&self) -> eyre::Result<()> {
347 let mut guard = self.inner.lazy_spawn.lock().unwrap();
348 let Some(lazy) = guard.take() else {
349 return Ok(());
350 };
351
352 let LazySpawnState { rx, config } = lazy;
353
354 debug!(
355 blocking = self.inner.blocking,
356 workers = config.tuning.jit_worker_count,
357 hot_threshold = config.tuning.jit_hot_threshold,
358 channel_capacity = config.tuning.channel_capacity,
359 "spawning backend thread",
360 );
361
362 match Self::preload_aot(config.store.as_deref()) {
364 Ok(entries) => {
365 for (key, prog) in entries {
366 self.inner.shared.resident.insert(key, prog);
367 }
368 }
369 Err(e) => {
370 *guard = Some(LazySpawnState { rx, config });
372 return Err(e);
373 }
374 }
375
376 drop(guard);
377
378 let (done_tx, done_rx) = chan::bounded::<()>(1);
379 let shared = Arc::clone(&self.inner.shared);
380
381 let thread = std::thread::Builder::new()
382 .name(config.thread_name.clone())
383 .spawn(move || {
384 backend::run(shared, rx, config);
385 let _ = done_tx.send(());
386 })
387 .wrap_err("failed to spawn backend thread")?;
388
389 *self.inner.thread.lock().unwrap() = Some(BackendThread { handle: thread, done_rx });
390 Ok(())
391 }
392
393 fn preload_aot(
395 store: Option<&dyn ArtifactStore>,
396 ) -> eyre::Result<Vec<(RuntimeCacheKey, Arc<CompiledProgram>)>> {
397 let Some(store) = store else {
398 debug!("no artifact store configured, skipping AOT preload");
399 return Ok(Vec::new());
400 };
401
402 let span = info_span!("aot_preload");
403 let _enter = span.enter();
404
405 let artifacts = store.load_all()?;
406 info!(count = artifacts.len(), "loading AOT artifacts");
407
408 let mut out: Vec<(RuntimeCacheKey, Arc<CompiledProgram>)> =
409 Vec::with_capacity(artifacts.len());
410 let mut seen = alloy_primitives::map::HashSet::<RuntimeCacheKey>::default();
411 let mut loaded = 0u64;
412 let mut failed = 0u64;
413
414 for (artifact_key, stored) in artifacts {
415 match Self::load_artifact(&artifact_key, &stored) {
416 Ok(program) => {
417 let key = artifact_key.runtime;
418 if !seen.insert(key) {
419 warn!(
420 code_hash = %key.code_hash,
421 spec_id = ?key.spec_id,
422 "duplicate artifact key, keeping first",
423 );
424 continue;
425 }
426 out.push((key, Arc::new(program)));
427 loaded += 1;
428 }
429 Err(e) => {
430 warn!(
431 code_hash = %artifact_key.runtime.code_hash,
432 error = %e,
433 "failed to load artifact, skipping",
434 );
435 failed += 1;
436 }
437 }
438 }
439
440 info!(loaded, failed, "AOT preload complete");
441 Ok(out)
442 }
443
444 fn load_artifact(key: &ArtifactKey, stored: &StoredArtifact) -> eyre::Result<CompiledProgram> {
446 let library = unsafe { libloading::Library::new(&stored.dylib_path) }
447 .wrap_err_with(|| format!("dlopen {:?}", stored.dylib_path))?;
448
449 let func: EvmCompilerFn = unsafe {
450 let sym: libloading::Symbol<'_, EvmCompilerFn> = library
451 .get(stored.manifest.symbol_name.as_bytes())
452 .wrap_err_with(|| format!("symbol '{}'", stored.manifest.symbol_name))?;
453 *sym
454 };
455
456 let library = Arc::new(LoadedLibrary::new(library));
457 Ok(CompiledProgram::new_aot(key.runtime, func, library))
458 }
459}
460
461impl BackendShared {
462 pub(crate) fn stats(&self) -> RuntimeStatsSnapshot {
467 self.stats.snapshot(stats::RuntimeStatsGauges {
468 resident_entries: self.resident.len() as u64,
469 events_queued: self.events.len() as u64,
470 command_queue_len: 0,
471 })
472 }
473}
474
475impl BackendInner {
476 pub(crate) fn stats(&self) -> RuntimeStatsSnapshot {
478 let mut snap = self.shared.stats();
479 snap.command_queue_len = self.tx.len() as u64;
480 snap
481 }
482
483 fn shutdown(&self) -> eyre::Result<()> {
484 debug!("shutting down JIT backend");
485 if let Some(ct) = self.thread.lock().unwrap().take() {
486 let _ = self.tx.send(Command::Shutdown);
488
489 match ct.done_rx.recv_timeout(self.shutdown_timeout) {
491 Ok(()) | Err(chan::RecvTimeoutError::Disconnected) => {}
492 Err(chan::RecvTimeoutError::Timeout) => {
493 eyre::bail!(
494 "backend thread did not exit within timeout ({:?})",
495 self.shutdown_timeout
496 );
497 }
498 }
499
500 ct.handle.join().map_err(|_| eyre::eyre!("backend thread panicked"))?;
502 }
503 Ok(())
504 }
505}
506
507impl Drop for BackendInner {
508 fn drop(&mut self) {
509 if let Err(err) = self.shutdown() {
510 warn!(%err, "failed to shutdown JIT backend");
511 }
512 }
513}