1use crate::{
4 CompileTimings, OptimizationLevel, eyre,
5 runtime::{
6 config::{CompilationKind, RuntimeConfig},
7 stats::RuntimeStats,
8 storage::RuntimeCacheKey,
9 worker::{
10 CompileJob, CompilerState, CompilerTarget, JitObjectSuccess, SyncNotifier,
11 WorkerResult, WorkerSuccess, compile_with_state,
12 },
13 },
14};
15use alloy_primitives::{B256, Bytes};
16use crossbeam_channel as chan;
17use rayon::ThreadPoolBuilder;
18use revm_context_interface::cfg::{GasParams, gas_params::GasId};
19use revm_primitives::hardfork::SpecId;
20use std::{
21 cell::RefCell,
22 collections::HashMap,
23 io::{BufReader, BufWriter, Read, Write},
24 ops::ControlFlow,
25 os::unix::process::CommandExt,
26 path::PathBuf,
27 process::{Child, ChildStdin, Command, Stdio},
28 sync::{
29 Arc, Condvar, Mutex,
30 atomic::{AtomicBool, AtomicU64, Ordering},
31 },
32 thread::JoinHandle,
33 time::{Duration, Instant},
34};
35use wait_timeout::ChildExt;
36use wincode::{SchemaRead, SchemaWrite};
37
38const HELPER_ENV: &str = "REVMC_JIT_HELPER";
39const GAS_PARAM_COUNT: usize = 256;
40const HELPER_PAUSE_TIMEOUT: Duration = Duration::from_millis(100);
41
42type GasParamPairs = Vec<(u8, u64)>;
43type PendingResponses = Arc<Mutex<HashMap<u64, chan::Sender<Result<HelperJobResult, String>>>>>;
44type PendingPauses = Arc<Mutex<HashMap<u64, chan::Sender<Result<(), String>>>>>;
45
46pub(super) fn maybe_run_jit_helper() -> eyre::Result<ControlFlow<()>> {
48 if std::env::var_os(HELPER_ENV).is_none() {
49 return Ok(ControlFlow::Continue(()));
50 }
51 run_jit_helper_stdio()?;
52 Ok(ControlFlow::Break(()))
53}
54
55pub(super) fn compile_job(
57 job: CompileJob,
58 config: &RuntimeConfig,
59 helper: &HelperProcess,
60) -> WorkerResult {
61 let t0 = Instant::now();
62 let (outcome, timings) = match run_helper_job(&job, config, helper) {
63 Ok(result) => (result.outcome, result.timings),
64 Err(error) => (Err(error), CompileTimings::default()),
65 };
66 WorkerResult {
67 key: job.key,
68 outcome,
69 kind: job.kind,
70 sync_notifier: job.sync_notifier,
71 generation: job.generation,
72 compile_duration: t0.elapsed(),
73 timings,
74 }
75}
76
77fn run_helper_job(
78 job: &CompileJob,
79 config: &RuntimeConfig,
80 helper: &HelperProcess,
81) -> Result<HelperJobResult, String> {
82 helper.compile(job, config)
83}
84
85struct HelperJobResult {
86 outcome: Result<WorkerSuccess, String>,
87 timings: CompileTimings,
88}
89
90struct HelperIo {
91 stdin: BufWriter<ChildStdin>,
92}
93
94pub(super) struct HelperProcess {
95 inner: Mutex<Option<Arc<HelperProcessInner>>>,
96 paused: Arc<AtomicBool>,
97 stats: Arc<RuntimeStats>,
98}
99
100impl HelperProcess {
101 pub(super) fn new(stats: Arc<RuntimeStats>) -> Self {
102 Self { inner: Mutex::new(None), paused: Arc::new(AtomicBool::new(false)), stats }
103 }
104
105 fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result<HelperJobResult, String> {
106 let helper = {
107 let mut slot = self.inner.lock().unwrap();
108 if slot.as_ref().is_none_or(|helper| !helper.matches_config(config)) {
109 let restarting = slot.is_some();
110 debug!("spawning JIT helper");
111 match HelperProcessInner::spawn(config, self.stats.clone(), self.paused.clone()) {
112 Ok(helper) => {
113 if self.paused.load(Ordering::Relaxed) {
114 helper.pause();
115 }
116 self.stats.jit_helper_spawns.fetch_add(1, Ordering::Relaxed);
117 if restarting {
118 self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed);
119 }
120 *slot = Some(Arc::new(helper));
121 }
122 Err(err) => {
123 self.stats.jit_helper_spawn_failures.fetch_add(1, Ordering::Relaxed);
124 return Err(err);
125 }
126 }
127 }
128 slot.as_ref().unwrap().clone()
129 };
130
131 match helper.compile(job, config) {
132 Ok(result) => Ok(result),
133 Err(err) => {
134 let mut slot = self.inner.lock().unwrap();
135 if slot.as_ref().is_some_and(|current| Arc::ptr_eq(current, &helper)) {
136 warn!(error = %err, "discarding JIT helper after failed job");
137 self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed);
138 *slot = None;
139 }
140 Err(err)
141 }
142 }
143 }
144
145 pub(super) fn cancel_in_flight(&self) {
146 if let Some(helper) = self.inner.lock().unwrap().take() {
147 helper.kill();
148 }
149 }
150
151 pub(super) fn pause(&self) {
152 self.stats.jit_helper_pause_requests.fetch_add(1, Ordering::Relaxed);
153 self.paused.store(true, Ordering::Relaxed);
154 if let Some(helper) = self.inner.lock().unwrap().as_ref() {
155 helper.pause();
156 }
157 }
158
159 pub(super) fn resume(&self) {
160 self.stats.jit_helper_resume_requests.fetch_add(1, Ordering::Relaxed);
161 self.paused.store(false, Ordering::Relaxed);
162 if let Some(helper) = self.inner.lock().unwrap().as_ref() {
163 helper.resume();
164 }
165 }
166}
167
168struct HelperProcessInner {
169 path: PathBuf,
170 init: HelperInit,
171 child: Mutex<Child>,
172 io: Mutex<HelperIo>,
173 pending: PendingResponses,
174 pending_pauses: PendingPauses,
175 reader: Mutex<Option<JoinHandle<()>>>,
176 next_job_id: AtomicU64,
177 shutdown_timeout: Duration,
178 stats: Arc<RuntimeStats>,
179 paused: Arc<AtomicBool>,
180}
181
182impl HelperProcessInner {
183 fn spawn(
184 config: &RuntimeConfig,
185 stats: Arc<RuntimeStats>,
186 paused: Arc<AtomicBool>,
187 ) -> Result<Self, String> {
188 let path = match &config.jit_helper_path {
189 Some(path) => path.clone(),
190 None => {
191 std::env::current_exe().map_err(|e| format!("failed to locate current exe: {e}"))?
192 }
193 };
194 let init = helper_init(config);
195 let mut command = Command::new(&path);
196 command
197 .env(HELPER_ENV, "1")
198 .stdin(Stdio::piped())
199 .stdout(Stdio::piped())
200 .stderr(Stdio::inherit());
201 apply_helper_limits(&mut command, config);
202
203 let mut child = command.spawn().map_err(|e| format!("failed to spawn JIT helper: {e}"))?;
204 let mut stdin = BufWriter::new(child.stdin.take().ok_or("helper stdin unavailable")?);
205 write_init(&mut stdin, &init).map_err(|e| format!("failed to write helper init: {e}"))?;
206 stdin.flush().map_err(|e| format!("failed to flush helper init: {e}"))?;
207 let stdout = child.stdout.take().ok_or("helper stdout unavailable")?;
208 let pending = PendingResponses::default();
209 let pending_pauses = PendingPauses::default();
210 let reader_pending = pending.clone();
211 let reader_pending_pauses = pending_pauses.clone();
212 let reader_stats = stats.clone();
213 let reader = std::thread::spawn(move || {
214 let mut stdout = BufReader::new(stdout);
215 loop {
216 let result = read_helper_response(&mut stdout);
217 match result {
218 Ok(HelperResponseMessage::Job(id, result)) => {
219 if let Some(tx) = reader_pending.lock().unwrap().remove(&id) {
220 let _ = tx.send(Ok(result));
221 }
222 }
223 Ok(HelperResponseMessage::Paused(id)) => {
224 if let Some(tx) = reader_pending_pauses.lock().unwrap().remove(&id) {
225 let _ = tx.send(Ok(()));
226 }
227 }
228 Err(error) => {
229 reader_stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed);
230 for (_, tx) in reader_pending.lock().unwrap().drain() {
231 let _ = tx.send(Err(error.clone()));
232 }
233 for (_, tx) in reader_pending_pauses.lock().unwrap().drain() {
234 let _ = tx.send(Err(error.clone()));
235 }
236 break;
237 }
238 }
239 }
240 });
241 Ok(Self {
242 path,
243 init,
244 child: Mutex::new(child),
245 io: Mutex::new(HelperIo { stdin }),
246 pending,
247 pending_pauses,
248 reader: Mutex::new(Some(reader)),
249 next_job_id: AtomicU64::new(0),
250 shutdown_timeout: config.tuning.shutdown_timeout,
251 stats,
252 paused,
253 })
254 }
255
256 fn matches_config(&self, config: &RuntimeConfig) -> bool {
257 if self.init != helper_init(config) {
258 return false;
259 }
260 match &config.jit_helper_path {
261 Some(path) => self.path == *path,
262 None => std::env::current_exe().map(|path| self.path == path).unwrap_or(false),
263 }
264 }
265
266 fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result<HelperJobResult, String> {
267 let id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
268 let (tx, rx) = chan::bounded(1);
269 self.pending.lock().unwrap().insert(id, tx);
270
271 {
272 let mut io = self.io.lock().unwrap();
273 if let Err(e) = write_job(&mut io.stdin, id, job) {
274 self.pending.lock().unwrap().remove(&id);
275 return Err(format!("failed to write helper job: {e}"));
276 }
277 if let Err(e) = io.stdin.flush() {
278 self.pending.lock().unwrap().remove(&id);
279 return Err(format!("failed to flush helper job: {e}"));
280 }
281 }
282
283 loop {
284 match rx.recv_timeout(config.tuning.jit_timeout) {
285 Ok(result) => return result,
286 Err(chan::RecvTimeoutError::Timeout) if self.paused.load(Ordering::Relaxed) => {
287 continue;
288 }
289 Err(chan::RecvTimeoutError::Timeout) => {
290 warn!(timeout = ?config.tuning.jit_timeout, "JIT helper timed out");
291 self.pending.lock().unwrap().remove(&id);
292 self.stats.jit_helper_timeouts.fetch_add(1, Ordering::Relaxed);
293 self.kill();
294 return Err(format!(
295 "JIT helper timed out after {:?}; helper will be restarted",
296 config.tuning.jit_timeout
297 ));
298 }
299 Err(chan::RecvTimeoutError::Disconnected) => {
300 let status = self.child.lock().unwrap().try_wait().ok().flatten();
301 let message = match status {
302 Some(status) => format!("JIT helper exited with {status}"),
303 None => "JIT helper disconnected".into(),
304 };
305 warn!(message, "JIT helper disconnected");
306 self.stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed);
307 return Err(format!("{message}; helper will be restarted"));
308 }
309 }
310 }
311 }
312
313 fn kill(&self) -> bool {
314 let mut child = self.child.lock().unwrap();
315 if matches!(child.try_wait(), Ok(Some(_))) {
316 return true;
317 }
318 kill_helper(&mut child);
319 match child.wait_timeout(self.shutdown_timeout) {
320 Ok(Some(_)) => true,
321 Ok(None) => {
322 warn!(timeout = ?self.shutdown_timeout, "timed out waiting for JIT helper exit");
323 false
324 }
325 Err(err) => {
326 warn!(%err, "failed to wait for JIT helper exit");
327 false
328 }
329 }
330 }
331
332 fn pause(&self) {
333 let id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
334 let (tx, rx) = chan::bounded(1);
335 self.pending_pauses.lock().unwrap().insert(id, tx);
336
337 let send_result = {
338 let mut io = self.io.lock().unwrap();
339 write_pause(&mut io.stdin, id).and_then(|()| io.stdin.flush())
340 };
341
342 if let Err(err) = send_result {
343 self.pending_pauses.lock().unwrap().remove(&id);
344 self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
345 warn!(%err, "failed to request graceful JIT helper pause");
346 } else {
347 match rx.recv_timeout(HELPER_PAUSE_TIMEOUT) {
348 Ok(Ok(())) => {
349 self.stats.jit_helper_pause_acknowledgements.fetch_add(1, Ordering::Relaxed);
350 }
351 Ok(Err(err)) => {
352 self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
353 warn!(%err, "JIT helper graceful pause failed");
354 }
355 Err(chan::RecvTimeoutError::Timeout) => {
356 self.pending_pauses.lock().unwrap().remove(&id);
357 self.stats.jit_helper_pause_timeouts.fetch_add(1, Ordering::Relaxed);
358 warn!(timeout = ?HELPER_PAUSE_TIMEOUT, "timed out waiting for JIT helper pause");
359 }
360 Err(chan::RecvTimeoutError::Disconnected) => {
361 self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
362 warn!("JIT helper disconnected before pause acknowledgement");
363 }
364 }
365 }
366
367 self.signal(libc::SIGSTOP, "pause");
368 }
369
370 fn resume(&self) {
371 self.signal(libc::SIGCONT, "resume");
372 let send_result = {
373 let mut io = self.io.lock().unwrap();
374 write_resume(&mut io.stdin).and_then(|()| io.stdin.flush())
375 };
376 if let Err(err) = send_result {
377 self.stats.jit_helper_resume_failures.fetch_add(1, Ordering::Relaxed);
378 warn!(%err, "failed to request JIT helper resume");
379 }
380 }
381
382 fn signal(&self, signal: libc::c_int, action: &str) {
383 let mut child = self.child.lock().unwrap();
384 if matches!(child.try_wait(), Ok(Some(_))) {
385 return;
386 }
387 signal_helper(&child, signal, action);
388 }
389}
390
391impl Drop for HelperProcessInner {
392 fn drop(&mut self) {
393 let exited = self.kill();
394 if exited && let Some(reader) = self.reader.lock().unwrap().take() {
395 let _ = reader.join();
396 }
397 }
398}
399
400fn apply_helper_limits(command: &mut Command, config: &RuntimeConfig) {
401 let memory_limit = config.tuning.jit_helper_memory_limit_bytes;
402 let cpu_count = config.tuning.jit_helper_cpu_count;
403
404 unsafe {
407 command.pre_exec(move || {
408 set_process_group()?;
409 if memory_limit > 0 {
410 set_rlimit(libc::RLIMIT_AS as _, memory_limit)?;
411 }
412 if cpu_count > 0 {
413 limit_cpu_affinity(cpu_count)?;
414 }
415 Ok(())
416 });
417 }
418}
419
420fn set_process_group() -> std::io::Result<()> {
421 if unsafe { libc::setpgid(0, 0) } != 0 {
422 return Err(std::io::Error::last_os_error());
423 }
424 Ok(())
425}
426
427fn kill_helper(child: &mut Child) {
428 let pid = child.id() as libc::pid_t;
429 if unsafe { libc::kill(-pid, libc::SIGKILL) } == 0 {
430 return;
431 }
432
433 let err = std::io::Error::last_os_error();
434 if err.raw_os_error() != Some(libc::ESRCH) {
435 warn!(%err, "failed to kill JIT helper process group");
436 }
437 if let Err(err) = child.kill() {
438 warn!(%err, "failed to kill JIT helper");
439 }
440}
441
442fn signal_helper(child: &Child, signal: libc::c_int, action: &str) {
443 let pid = child.id() as libc::pid_t;
444 if unsafe { libc::kill(-pid, signal) } == 0 {
445 return;
446 }
447
448 let err = std::io::Error::last_os_error();
449 if err.raw_os_error() != Some(libc::ESRCH) {
450 warn!(%err, signal, action, "failed to signal JIT helper process group");
451 }
452}
453
454fn set_rlimit(resource: libc::c_int, value: u64) -> std::io::Result<()> {
455 let value = libc::rlim_t::try_from(value).unwrap_or(libc::rlim_t::MAX);
456 let limit = libc::rlimit { rlim_cur: value, rlim_max: value };
457 if unsafe { libc::setrlimit(resource as _, &limit) } != 0 {
458 return Err(std::io::Error::last_os_error());
459 }
460 Ok(())
461}
462
463#[cfg(any(target_os = "linux", target_os = "android"))]
464fn limit_cpu_affinity(cpu_count: usize) -> std::io::Result<()> {
465 let mut current = unsafe { std::mem::zeroed::<libc::cpu_set_t>() };
466 let size = std::mem::size_of::<libc::cpu_set_t>();
467 if unsafe { libc::sched_getaffinity(0, size, &mut current) } != 0 {
468 return Err(std::io::Error::last_os_error());
469 }
470
471 let mut limited = unsafe { std::mem::zeroed::<libc::cpu_set_t>() };
472 let mut remaining = cpu_count;
473 for cpu in 0..(8 * size) {
474 if unsafe { libc::CPU_ISSET(cpu, ¤t) } {
475 unsafe { libc::CPU_SET(cpu, &mut limited) };
476 remaining -= 1;
477 if remaining == 0 {
478 break;
479 }
480 }
481 }
482 if remaining == cpu_count {
483 return Err(std::io::Error::new(
484 std::io::ErrorKind::InvalidInput,
485 "current CPU affinity mask is empty",
486 ));
487 }
488
489 if unsafe { libc::sched_setaffinity(0, size, &limited) } != 0 {
490 return Err(std::io::Error::last_os_error());
491 }
492 Ok(())
493}
494
495#[cfg(not(any(target_os = "linux", target_os = "android")))]
496fn limit_cpu_affinity(_cpu_count: usize) -> std::io::Result<()> {
497 Ok(())
498}
499
500#[derive(Clone, PartialEq, Eq, SchemaWrite, SchemaRead)]
501struct HelperInit {
502 debug_assertions: bool,
503 single_error: bool,
504 no_dedup: bool,
505 no_dse: bool,
506 dump_dir: Option<String>,
507 gas_params: Option<GasParamPairs>,
508 jit_worker_count: usize,
509 compiler_recycle_threshold: usize,
510}
511
512#[derive(SchemaWrite, SchemaRead)]
513enum HelperRequest {
514 Init(HelperInit),
515 Compile(HelperCompile),
516 Pause { id: u64 },
517 Resume,
518}
519
520#[derive(SchemaWrite, SchemaRead)]
521struct HelperCompile {
522 id: u64,
523 code_hash: [u8; 32],
524 spec_id: u8,
525 opt_level: u8,
526 symbol_name: String,
527 bytecode: Vec<u8>,
528}
529
530#[derive(SchemaWrite, SchemaRead)]
531enum HelperResponse {
532 Ok {
533 id: u64,
534 symbol_name: String,
535 object_bytes: Vec<u8>,
536 builtin_symbols: Vec<String>,
537 timings: HelperTimings,
538 },
539 Err {
540 id: u64,
541 error: String,
542 timings: HelperTimings,
543 },
544 Paused {
545 id: u64,
546 },
547}
548
549#[derive(Clone, Copy, Default, SchemaWrite, SchemaRead)]
550struct HelperTimings {
551 parse: u64,
552 translate: u64,
553 optimize: u64,
554 codegen: u64,
555}
556
557fn write_init<W: Write + ?Sized>(w: &mut BufWriter<W>, init: &HelperInit) -> std::io::Result<()> {
558 write_message(w, &HelperRequest::Init(init.clone()))
559}
560
561fn write_job<W: Write + ?Sized>(
562 w: &mut BufWriter<W>,
563 id: u64,
564 job: &CompileJob,
565) -> std::io::Result<()> {
566 let req = HelperRequest::Compile(HelperCompile {
567 id,
568 code_hash: job.key.code_hash.0,
569 spec_id: job.key.spec_id as u8,
570 opt_level: opt_level_to_u8(job.opt_level),
571 symbol_name: job.symbol_name.clone(),
572 bytecode: job.bytecode.to_vec(),
573 });
574 write_message(w, &req)
575}
576
577fn write_pause<W: Write + ?Sized>(w: &mut BufWriter<W>, id: u64) -> std::io::Result<()> {
578 write_message(w, &HelperRequest::Pause { id })
579}
580
581fn write_resume<W: Write + ?Sized>(w: &mut BufWriter<W>) -> std::io::Result<()> {
582 write_message(w, &HelperRequest::Resume)
583}
584
585enum HelperResponseMessage {
586 Job(u64, HelperJobResult),
587 Paused(u64),
588}
589
590fn read_helper_response<R: Read + ?Sized>(
591 r: &mut BufReader<R>,
592) -> Result<HelperResponseMessage, String> {
593 match read_message(r).map_err(|e| format!("failed to decode helper result: {e}"))? {
594 HelperResponse::Ok { id, symbol_name, object_bytes, builtin_symbols, timings } => {
595 Ok(HelperResponseMessage::Job(
596 id,
597 HelperJobResult {
598 outcome: Ok(WorkerSuccess::JitObject(JitObjectSuccess {
599 symbol_name,
600 object_bytes: Bytes::from(object_bytes),
601 builtin_symbols,
602 })),
603 timings: timings.into(),
604 },
605 ))
606 }
607 HelperResponse::Err { id, error, timings } => Ok(HelperResponseMessage::Job(
608 id,
609 HelperJobResult { outcome: Err(error), timings: timings.into() },
610 )),
611 HelperResponse::Paused { id } => Ok(HelperResponseMessage::Paused(id)),
612 }
613}
614
615impl From<CompileTimings> for HelperTimings {
616 fn from(timings: CompileTimings) -> Self {
617 Self {
618 parse: duration_to_nanos(timings.parse),
619 translate: duration_to_nanos(timings.translate),
620 optimize: duration_to_nanos(timings.optimize),
621 codegen: duration_to_nanos(timings.codegen),
622 }
623 }
624}
625
626impl From<HelperTimings> for CompileTimings {
627 fn from(timings: HelperTimings) -> Self {
628 Self {
629 parse: Duration::from_nanos(timings.parse),
630 translate: Duration::from_nanos(timings.translate),
631 optimize: Duration::from_nanos(timings.optimize),
632 codegen: Duration::from_nanos(timings.codegen),
633 }
634 }
635}
636
637fn duration_to_nanos(duration: Duration) -> u64 {
638 u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
639}
640
641thread_local! {
642 static HELPER_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
643}
644
645#[derive(Default)]
646struct HelperPauseState {
647 paused: Mutex<bool>,
648 resumed: Condvar,
649}
650
651impl HelperPauseState {
652 fn pause(&self) {
653 *self.paused.lock().unwrap() = true;
654 }
655
656 fn resume(&self) {
657 *self.paused.lock().unwrap() = false;
658 self.resumed.notify_all();
659 }
660
661 fn wait_resumed(&self) {
662 let mut paused = self.paused.lock().unwrap();
663 while *paused {
664 paused = self.resumed.wait(paused).unwrap();
665 }
666 }
667}
668
669fn run_jit_helper_stdio() -> eyre::Result<()> {
670 let mut stdin = BufReader::new(std::io::stdin().lock());
671 let config = Arc::new(read_helper_init(&mut stdin)?);
672 let worker_count = config.tuning.jit_worker_count.max(1);
673 let pool = ThreadPoolBuilder::new()
674 .num_threads(worker_count)
675 .thread_name(|i| format!("revmc-helper-{i:02}"))
676 .exit_handler(|_| {
677 HELPER_COMPILER.with_borrow_mut(Option::take);
678 })
679 .build()?;
680 let stdout = Arc::new(Mutex::new(BufWriter::new(std::io::stdout())));
681 let pause_state = Arc::new(HelperPauseState::default());
682
683 loop {
684 let request = match read_helper_request(&mut stdin) {
685 Ok(request) => request,
686 Err(err) if is_unexpected_eof(&err) => break,
687 Err(err) => return Err(err),
688 };
689 match request {
690 HelperWork::Compile { id, job } => {
691 let config = Arc::clone(&config);
692 let stdout = Arc::clone(&stdout);
693 let pause_state = Arc::clone(&pause_state);
694 pool.spawn_fifo(move || {
695 let result = HELPER_COMPILER.with_borrow_mut(|compiler| {
696 compile_with_state(job, &config, CompilerTarget::JitObject, compiler)
697 });
698 pause_state.wait_resumed();
699 let mut stdout = stdout.lock().unwrap();
700 if let Err(err) = write_helper_result(&mut stdout, id, result)
701 .and_then(|()| stdout.flush().map_err(Into::into))
702 {
703 error!(%err, "failed to write helper result");
704 std::process::exit(1);
705 }
706 });
707 }
708 HelperWork::Pause { id } => {
709 pause_state.pause();
710 let mut stdout = stdout.lock().unwrap();
711 write_pause_ack(&mut stdout, id)?;
712 stdout.flush()?;
713 }
714 HelperWork::Resume => {
715 pause_state.resume();
716 }
717 }
718 }
719
720 Ok(())
721}
722
723fn read_helper_init<R: Read + ?Sized>(stdin: &mut BufReader<R>) -> eyre::Result<RuntimeConfig> {
724 match read_message(stdin)? {
725 HelperRequest::Init(init) => runtime_config_from_init(init),
726 HelperRequest::Compile(_) | HelperRequest::Pause { .. } | HelperRequest::Resume => {
727 eyre::bail!("JIT helper received request before init")
728 }
729 }
730}
731
732enum HelperWork {
733 Compile { id: u64, job: CompileJob },
734 Pause { id: u64 },
735 Resume,
736}
737
738fn read_helper_request<R: Read + ?Sized>(stdin: &mut BufReader<R>) -> eyre::Result<HelperWork> {
739 let req = match read_message(stdin)? {
740 HelperRequest::Compile(req) => req,
741 HelperRequest::Pause { id } => return Ok(HelperWork::Pause { id }),
742 HelperRequest::Resume => return Ok(HelperWork::Resume),
743 HelperRequest::Init(_) => eyre::bail!("JIT helper received duplicate init"),
744 };
745 let spec_id = SpecId::try_from_u8(req.spec_id).ok_or_else(|| eyre::eyre!("invalid spec id"))?;
746 let opt_level = opt_level_from_u8(req.opt_level)?;
747
748 let job = CompileJob {
749 kind: CompilationKind::Jit,
750 key: RuntimeCacheKey { code_hash: B256::from(req.code_hash), spec_id },
751 bytecode: Bytes::from(req.bytecode),
752 symbol_name: req.symbol_name,
753 opt_level,
754 sync_notifier: SyncNotifier::none(),
755 generation: 0,
756 };
757 Ok(HelperWork::Compile { id: req.id, job })
758}
759
760fn write_helper_result<W: Write + ?Sized>(
761 stdout: &mut BufWriter<W>,
762 id: u64,
763 result: WorkerResult,
764) -> eyre::Result<()> {
765 let timings = result.timings.into();
766 let response = match result.outcome {
767 Ok(WorkerSuccess::JitObject(success)) => HelperResponse::Ok {
768 id,
769 symbol_name: success.symbol_name,
770 object_bytes: success.object_bytes.to_vec(),
771 builtin_symbols: success.builtin_symbols,
772 timings,
773 },
774 Ok(_) => unreachable!(),
775 Err(error) => HelperResponse::Err { id, error, timings },
776 };
777 write_message(stdout, &response)?;
778 Ok(())
779}
780
781fn write_pause_ack<W: Write + ?Sized>(stdout: &mut BufWriter<W>, id: u64) -> eyre::Result<()> {
782 write_message(stdout, &HelperResponse::Paused { id })?;
783 Ok(())
784}
785
786fn write_message<T, W: Write + ?Sized>(w: &mut BufWriter<W>, message: &T) -> std::io::Result<()>
787where
788 T: wincode::SchemaWrite<wincode::config::DefaultConfig, Src = T>,
789{
790 wincode::serialize_into(w, message)
791 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
792}
793
794fn read_message<T, R: Read + ?Sized>(r: &mut BufReader<R>) -> std::io::Result<T>
795where
796 T: wincode::SchemaReadOwned<wincode::config::DefaultConfig, Dst = T>,
797{
798 wincode::deserialize_from(r).map_err(|e| std::io::Error::new(read_error_kind(&e), e))
799}
800
801fn read_error_kind(err: &wincode::ReadError) -> std::io::ErrorKind {
802 match err {
803 wincode::ReadError::Io(wincode::io::ReadError::ReadSizeLimit(_)) => {
804 std::io::ErrorKind::UnexpectedEof
805 }
806 _ => std::io::ErrorKind::InvalidData,
807 }
808}
809
810fn is_unexpected_eof(err: &eyre::Report) -> bool {
811 err.downcast_ref::<std::io::Error>()
812 .is_some_and(|err| err.kind() == std::io::ErrorKind::UnexpectedEof)
813}
814
815fn opt_level_to_u8(level: OptimizationLevel) -> u8 {
816 match level {
817 OptimizationLevel::None => 0,
818 OptimizationLevel::Less => 1,
819 OptimizationLevel::Default => 2,
820 OptimizationLevel::Aggressive => 3,
821 }
822}
823
824fn opt_level_from_u8(level: u8) -> eyre::Result<OptimizationLevel> {
825 Ok(match level {
826 0 => OptimizationLevel::None,
827 1 => OptimizationLevel::Less,
828 2 => OptimizationLevel::Default,
829 3 => OptimizationLevel::Aggressive,
830 _ => eyre::bail!("invalid optimization level"),
831 })
832}
833
834fn gas_params_to_pairs(gas_params: &GasParams) -> GasParamPairs {
835 (0..GAS_PARAM_COUNT)
836 .map(|i| {
837 let id = i as u8;
838 (id, gas_params.get(GasId::new(id)))
839 })
840 .collect()
841}
842
843fn gas_params_from_pairs(pairs: GasParamPairs) -> eyre::Result<GasParams> {
844 if pairs.len() != GAS_PARAM_COUNT {
845 eyre::bail!("invalid gas params length: {}", pairs.len());
846 }
847
848 let mut table = [0; GAS_PARAM_COUNT];
849 let mut seen = [false; GAS_PARAM_COUNT];
850 for (id, value) in pairs {
851 let index = usize::from(id);
852 if seen[index] {
853 eyre::bail!("duplicate gas param id: {id}");
854 }
855 seen[index] = true;
856 table[index] = value;
857 }
858 if let Some((id, _)) = seen.iter().enumerate().find(|(_, seen)| !**seen) {
859 eyre::bail!("missing gas param id: {id}");
860 }
861
862 Ok(GasParams::new(Arc::new(table)))
863}
864
865fn helper_init(config: &RuntimeConfig) -> HelperInit {
866 HelperInit {
867 debug_assertions: config.debug_assertions,
868 single_error: config.single_error,
869 no_dedup: config.no_dedup,
870 no_dse: config.no_dse,
871 dump_dir: config.dump_dir.as_ref().map(|path| path.to_string_lossy().into_owned()),
872 gas_params: config.gas_params.as_ref().map(gas_params_to_pairs),
873 jit_worker_count: config.tuning.jit_worker_count,
874 compiler_recycle_threshold: config.tuning.compiler_recycle_threshold,
875 }
876}
877
878fn runtime_config_from_init(init: HelperInit) -> eyre::Result<RuntimeConfig> {
879 let mut config = RuntimeConfig {
880 dump_dir: init.dump_dir.map(PathBuf::from),
881 debug_assertions: init.debug_assertions,
882 single_error: init.single_error,
883 no_dedup: init.no_dedup,
884 no_dse: init.no_dse,
885 gas_params: init.gas_params.map(gas_params_from_pairs).transpose()?,
886 ..Default::default()
887 };
888 config.tuning.jit_worker_count = init.jit_worker_count;
889 config.tuning.compiler_recycle_threshold = init.compiler_recycle_threshold;
890 Ok(config)
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896
897 #[test]
898 fn helper_pause_state_blocks_until_resume() {
899 let pause_state = Arc::new(HelperPauseState::default());
900 pause_state.pause();
901
902 let (tx, rx) = chan::bounded(1);
903 let thread_pause_state = Arc::clone(&pause_state);
904 let thread = std::thread::spawn(move || {
905 thread_pause_state.wait_resumed();
906 tx.send(()).unwrap();
907 });
908
909 assert!(rx.recv_timeout(Duration::from_millis(50)).is_err());
910 pause_state.resume();
911 rx.recv_timeout(Duration::from_secs(1)).unwrap();
912 thread.join().unwrap();
913 }
914
915 #[test]
916 fn helper_pause_protocol_roundtrips() {
917 let mut request = Vec::new();
918 {
919 let mut writer = BufWriter::new(&mut request);
920 write_pause(&mut writer, 42).unwrap();
921 writer.flush().unwrap();
922 }
923 let mut reader = BufReader::new(request.as_slice());
924 match read_helper_request(&mut reader).unwrap() {
925 HelperWork::Pause { id } => assert_eq!(id, 42),
926 HelperWork::Compile { .. } | HelperWork::Resume => panic!("unexpected helper request"),
927 }
928
929 let mut response = Vec::new();
930 {
931 let mut writer = BufWriter::new(&mut response);
932 write_pause_ack(&mut writer, 42).unwrap();
933 writer.flush().unwrap();
934 }
935 let mut reader = BufReader::new(response.as_slice());
936 match read_helper_response(&mut reader).unwrap() {
937 HelperResponseMessage::Paused(id) => assert_eq!(id, 42),
938 HelperResponseMessage::Job(..) => panic!("unexpected helper response"),
939 }
940 }
941
942 #[test]
943 fn helper_pause_resume_requests_update_stats() {
944 let stats = Arc::new(RuntimeStats::default());
945 let helper = HelperProcess::new(Arc::clone(&stats));
946
947 helper.pause();
948 helper.pause();
949 helper.resume();
950
951 let snapshot = stats.snapshot(crate::runtime::stats::RuntimeStatsGauges::default());
952 assert_eq!(snapshot.jit_helper_pause_requests, 2);
953 assert_eq!(snapshot.jit_helper_pause_acknowledgements, 0);
954 assert_eq!(snapshot.jit_helper_pause_failures, 0);
955 assert_eq!(snapshot.jit_helper_pause_timeouts, 0);
956 assert_eq!(snapshot.jit_helper_resume_requests, 1);
957 assert_eq!(snapshot.jit_helper_resume_failures, 0);
958 }
959}