Skip to main content

revmc_runtime/runtime/
out_of_process.rs

1//! Out-of-process JIT helper process and IPC.
2
3use crate::{
4    CompileTimings, OptimizationLevel, eyre,
5    runtime::{
6        config::{CompilationKind, RuntimeConfig},
7        stats::RuntimeStats,
8        storage::RuntimeCacheKey,
9        worker::{
10            CompileJob, CompilerState, CompilerTarget, JitObjectSuccess, SyncNotifier,
11            WorkerResult, WorkerSuccess, compile_with_state,
12        },
13    },
14};
15use alloy_primitives::{B256, Bytes};
16use crossbeam_channel as chan;
17use rayon::ThreadPoolBuilder;
18use revm_context_interface::cfg::{GasParams, gas_params::GasId};
19use revm_primitives::hardfork::SpecId;
20use std::{
21    cell::RefCell,
22    collections::HashMap,
23    io::{BufReader, BufWriter, Read, Write},
24    ops::ControlFlow,
25    os::unix::process::CommandExt,
26    path::PathBuf,
27    process::{Child, ChildStdin, Command, Stdio},
28    sync::{
29        Arc, Condvar, Mutex,
30        atomic::{AtomicBool, AtomicU64, Ordering},
31    },
32    thread::JoinHandle,
33    time::{Duration, Instant},
34};
35use wait_timeout::ChildExt;
36use wincode::{SchemaRead, SchemaWrite};
37
38const HELPER_ENV: &str = "REVMC_JIT_HELPER";
39const GAS_PARAM_COUNT: usize = 256;
40const HELPER_PAUSE_TIMEOUT: Duration = Duration::from_millis(100);
41
42type GasParamPairs = Vec<(u8, u64)>;
43type PendingResponses = Arc<Mutex<HashMap<u64, chan::Sender<Result<HelperJobResult, String>>>>>;
44type PendingPauses = Arc<Mutex<HashMap<u64, chan::Sender<Result<(), String>>>>>;
45
46/// Runs the out-of-process JIT helper if this process was launched as one.
47pub(super) fn maybe_run_jit_helper() -> eyre::Result<ControlFlow<()>> {
48    if std::env::var_os(HELPER_ENV).is_none() {
49        return Ok(ControlFlow::Continue(()));
50    }
51    run_jit_helper_stdio()?;
52    Ok(ControlFlow::Break(()))
53}
54
55/// Compiles a job in the out-of-process helper.
56pub(super) fn compile_job(
57    job: CompileJob,
58    config: &RuntimeConfig,
59    helper: &HelperProcess,
60) -> WorkerResult {
61    let t0 = Instant::now();
62    let (outcome, timings) = match run_helper_job(&job, config, helper) {
63        Ok(result) => (result.outcome, result.timings),
64        Err(error) => (Err(error), CompileTimings::default()),
65    };
66    WorkerResult {
67        key: job.key,
68        outcome,
69        kind: job.kind,
70        sync_notifier: job.sync_notifier,
71        generation: job.generation,
72        compile_duration: t0.elapsed(),
73        timings,
74    }
75}
76
77fn run_helper_job(
78    job: &CompileJob,
79    config: &RuntimeConfig,
80    helper: &HelperProcess,
81) -> Result<HelperJobResult, String> {
82    helper.compile(job, config)
83}
84
85struct HelperJobResult {
86    outcome: Result<WorkerSuccess, String>,
87    timings: CompileTimings,
88}
89
90struct HelperIo {
91    stdin: BufWriter<ChildStdin>,
92}
93
94pub(super) struct HelperProcess {
95    inner: Mutex<Option<Arc<HelperProcessInner>>>,
96    paused: Arc<AtomicBool>,
97    stats: Arc<RuntimeStats>,
98}
99
100impl HelperProcess {
101    pub(super) fn new(stats: Arc<RuntimeStats>) -> Self {
102        Self { inner: Mutex::new(None), paused: Arc::new(AtomicBool::new(false)), stats }
103    }
104
105    fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result<HelperJobResult, String> {
106        let helper = {
107            let mut slot = self.inner.lock().unwrap();
108            if slot.as_ref().is_none_or(|helper| !helper.matches_config(config)) {
109                let restarting = slot.is_some();
110                debug!("spawning JIT helper");
111                match HelperProcessInner::spawn(config, self.stats.clone(), self.paused.clone()) {
112                    Ok(helper) => {
113                        if self.paused.load(Ordering::Relaxed) {
114                            helper.pause();
115                        }
116                        self.stats.jit_helper_spawns.fetch_add(1, Ordering::Relaxed);
117                        if restarting {
118                            self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed);
119                        }
120                        *slot = Some(Arc::new(helper));
121                    }
122                    Err(err) => {
123                        self.stats.jit_helper_spawn_failures.fetch_add(1, Ordering::Relaxed);
124                        return Err(err);
125                    }
126                }
127            }
128            slot.as_ref().unwrap().clone()
129        };
130
131        match helper.compile(job, config) {
132            Ok(result) => Ok(result),
133            Err(err) => {
134                let mut slot = self.inner.lock().unwrap();
135                if slot.as_ref().is_some_and(|current| Arc::ptr_eq(current, &helper)) {
136                    warn!(error = %err, "discarding JIT helper after failed job");
137                    self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed);
138                    *slot = None;
139                }
140                Err(err)
141            }
142        }
143    }
144
145    pub(super) fn cancel_in_flight(&self) {
146        if let Some(helper) = self.inner.lock().unwrap().take() {
147            helper.kill();
148        }
149    }
150
151    pub(super) fn pause(&self) {
152        self.stats.jit_helper_pause_requests.fetch_add(1, Ordering::Relaxed);
153        self.paused.store(true, Ordering::Relaxed);
154        if let Some(helper) = self.inner.lock().unwrap().as_ref() {
155            helper.pause();
156        }
157    }
158
159    pub(super) fn resume(&self) {
160        self.stats.jit_helper_resume_requests.fetch_add(1, Ordering::Relaxed);
161        self.paused.store(false, Ordering::Relaxed);
162        if let Some(helper) = self.inner.lock().unwrap().as_ref() {
163            helper.resume();
164        }
165    }
166}
167
168struct HelperProcessInner {
169    path: PathBuf,
170    init: HelperInit,
171    child: Mutex<Child>,
172    io: Mutex<HelperIo>,
173    pending: PendingResponses,
174    pending_pauses: PendingPauses,
175    reader: Mutex<Option<JoinHandle<()>>>,
176    next_job_id: AtomicU64,
177    shutdown_timeout: Duration,
178    stats: Arc<RuntimeStats>,
179    paused: Arc<AtomicBool>,
180}
181
182impl HelperProcessInner {
183    fn spawn(
184        config: &RuntimeConfig,
185        stats: Arc<RuntimeStats>,
186        paused: Arc<AtomicBool>,
187    ) -> Result<Self, String> {
188        let path = match &config.jit_helper_path {
189            Some(path) => path.clone(),
190            None => {
191                std::env::current_exe().map_err(|e| format!("failed to locate current exe: {e}"))?
192            }
193        };
194        let init = helper_init(config);
195        let mut command = Command::new(&path);
196        command
197            .env(HELPER_ENV, "1")
198            .stdin(Stdio::piped())
199            .stdout(Stdio::piped())
200            .stderr(Stdio::inherit());
201        apply_helper_limits(&mut command, config);
202
203        let mut child = command.spawn().map_err(|e| format!("failed to spawn JIT helper: {e}"))?;
204        let mut stdin = BufWriter::new(child.stdin.take().ok_or("helper stdin unavailable")?);
205        write_init(&mut stdin, &init).map_err(|e| format!("failed to write helper init: {e}"))?;
206        stdin.flush().map_err(|e| format!("failed to flush helper init: {e}"))?;
207        let stdout = child.stdout.take().ok_or("helper stdout unavailable")?;
208        let pending = PendingResponses::default();
209        let pending_pauses = PendingPauses::default();
210        let reader_pending = pending.clone();
211        let reader_pending_pauses = pending_pauses.clone();
212        let reader_stats = stats.clone();
213        let reader = std::thread::spawn(move || {
214            let mut stdout = BufReader::new(stdout);
215            loop {
216                let result = read_helper_response(&mut stdout);
217                match result {
218                    Ok(HelperResponseMessage::Job(id, result)) => {
219                        if let Some(tx) = reader_pending.lock().unwrap().remove(&id) {
220                            let _ = tx.send(Ok(result));
221                        }
222                    }
223                    Ok(HelperResponseMessage::Paused(id)) => {
224                        if let Some(tx) = reader_pending_pauses.lock().unwrap().remove(&id) {
225                            let _ = tx.send(Ok(()));
226                        }
227                    }
228                    Err(error) => {
229                        reader_stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed);
230                        for (_, tx) in reader_pending.lock().unwrap().drain() {
231                            let _ = tx.send(Err(error.clone()));
232                        }
233                        for (_, tx) in reader_pending_pauses.lock().unwrap().drain() {
234                            let _ = tx.send(Err(error.clone()));
235                        }
236                        break;
237                    }
238                }
239            }
240        });
241        Ok(Self {
242            path,
243            init,
244            child: Mutex::new(child),
245            io: Mutex::new(HelperIo { stdin }),
246            pending,
247            pending_pauses,
248            reader: Mutex::new(Some(reader)),
249            next_job_id: AtomicU64::new(0),
250            shutdown_timeout: config.tuning.shutdown_timeout,
251            stats,
252            paused,
253        })
254    }
255
256    fn matches_config(&self, config: &RuntimeConfig) -> bool {
257        if self.init != helper_init(config) {
258            return false;
259        }
260        match &config.jit_helper_path {
261            Some(path) => self.path == *path,
262            None => std::env::current_exe().map(|path| self.path == path).unwrap_or(false),
263        }
264    }
265
266    fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result<HelperJobResult, String> {
267        let id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
268        let (tx, rx) = chan::bounded(1);
269        self.pending.lock().unwrap().insert(id, tx);
270
271        {
272            let mut io = self.io.lock().unwrap();
273            if let Err(e) = write_job(&mut io.stdin, id, job) {
274                self.pending.lock().unwrap().remove(&id);
275                return Err(format!("failed to write helper job: {e}"));
276            }
277            if let Err(e) = io.stdin.flush() {
278                self.pending.lock().unwrap().remove(&id);
279                return Err(format!("failed to flush helper job: {e}"));
280            }
281        }
282
283        loop {
284            match rx.recv_timeout(config.tuning.jit_timeout) {
285                Ok(result) => return result,
286                Err(chan::RecvTimeoutError::Timeout) if self.paused.load(Ordering::Relaxed) => {
287                    continue;
288                }
289                Err(chan::RecvTimeoutError::Timeout) => {
290                    warn!(timeout = ?config.tuning.jit_timeout, "JIT helper timed out");
291                    self.pending.lock().unwrap().remove(&id);
292                    self.stats.jit_helper_timeouts.fetch_add(1, Ordering::Relaxed);
293                    self.kill();
294                    return Err(format!(
295                        "JIT helper timed out after {:?}; helper will be restarted",
296                        config.tuning.jit_timeout
297                    ));
298                }
299                Err(chan::RecvTimeoutError::Disconnected) => {
300                    let status = self.child.lock().unwrap().try_wait().ok().flatten();
301                    let message = match status {
302                        Some(status) => format!("JIT helper exited with {status}"),
303                        None => "JIT helper disconnected".into(),
304                    };
305                    warn!(message, "JIT helper disconnected");
306                    self.stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed);
307                    return Err(format!("{message}; helper will be restarted"));
308                }
309            }
310        }
311    }
312
313    fn kill(&self) -> bool {
314        let mut child = self.child.lock().unwrap();
315        if matches!(child.try_wait(), Ok(Some(_))) {
316            return true;
317        }
318        kill_helper(&mut child);
319        match child.wait_timeout(self.shutdown_timeout) {
320            Ok(Some(_)) => true,
321            Ok(None) => {
322                warn!(timeout = ?self.shutdown_timeout, "timed out waiting for JIT helper exit");
323                false
324            }
325            Err(err) => {
326                warn!(%err, "failed to wait for JIT helper exit");
327                false
328            }
329        }
330    }
331
332    fn pause(&self) {
333        let id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
334        let (tx, rx) = chan::bounded(1);
335        self.pending_pauses.lock().unwrap().insert(id, tx);
336
337        let send_result = {
338            let mut io = self.io.lock().unwrap();
339            write_pause(&mut io.stdin, id).and_then(|()| io.stdin.flush())
340        };
341
342        if let Err(err) = send_result {
343            self.pending_pauses.lock().unwrap().remove(&id);
344            self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
345            warn!(%err, "failed to request graceful JIT helper pause");
346        } else {
347            match rx.recv_timeout(HELPER_PAUSE_TIMEOUT) {
348                Ok(Ok(())) => {
349                    self.stats.jit_helper_pause_acknowledgements.fetch_add(1, Ordering::Relaxed);
350                }
351                Ok(Err(err)) => {
352                    self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
353                    warn!(%err, "JIT helper graceful pause failed");
354                }
355                Err(chan::RecvTimeoutError::Timeout) => {
356                    self.pending_pauses.lock().unwrap().remove(&id);
357                    self.stats.jit_helper_pause_timeouts.fetch_add(1, Ordering::Relaxed);
358                    warn!(timeout = ?HELPER_PAUSE_TIMEOUT, "timed out waiting for JIT helper pause");
359                }
360                Err(chan::RecvTimeoutError::Disconnected) => {
361                    self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed);
362                    warn!("JIT helper disconnected before pause acknowledgement");
363                }
364            }
365        }
366
367        self.signal(libc::SIGSTOP, "pause");
368    }
369
370    fn resume(&self) {
371        self.signal(libc::SIGCONT, "resume");
372        let send_result = {
373            let mut io = self.io.lock().unwrap();
374            write_resume(&mut io.stdin).and_then(|()| io.stdin.flush())
375        };
376        if let Err(err) = send_result {
377            self.stats.jit_helper_resume_failures.fetch_add(1, Ordering::Relaxed);
378            warn!(%err, "failed to request JIT helper resume");
379        }
380    }
381
382    fn signal(&self, signal: libc::c_int, action: &str) {
383        let mut child = self.child.lock().unwrap();
384        if matches!(child.try_wait(), Ok(Some(_))) {
385            return;
386        }
387        signal_helper(&child, signal, action);
388    }
389}
390
391impl Drop for HelperProcessInner {
392    fn drop(&mut self) {
393        let exited = self.kill();
394        if exited && let Some(reader) = self.reader.lock().unwrap().take() {
395            let _ = reader.join();
396        }
397    }
398}
399
400fn apply_helper_limits(command: &mut Command, config: &RuntimeConfig) {
401    let memory_limit = config.tuning.jit_helper_memory_limit_bytes;
402    let cpu_count = config.tuning.jit_helper_cpu_count;
403
404    // SAFETY: `pre_exec` runs in the child after fork and before exec. The closure only calls
405    // libc process/resource/affinity syscalls and constructs an `io::Error` if they fail.
406    unsafe {
407        command.pre_exec(move || {
408            set_process_group()?;
409            if memory_limit > 0 {
410                set_rlimit(libc::RLIMIT_AS as _, memory_limit)?;
411            }
412            if cpu_count > 0 {
413                limit_cpu_affinity(cpu_count)?;
414            }
415            Ok(())
416        });
417    }
418}
419
420fn set_process_group() -> std::io::Result<()> {
421    if unsafe { libc::setpgid(0, 0) } != 0 {
422        return Err(std::io::Error::last_os_error());
423    }
424    Ok(())
425}
426
427fn kill_helper(child: &mut Child) {
428    let pid = child.id() as libc::pid_t;
429    if unsafe { libc::kill(-pid, libc::SIGKILL) } == 0 {
430        return;
431    }
432
433    let err = std::io::Error::last_os_error();
434    if err.raw_os_error() != Some(libc::ESRCH) {
435        warn!(%err, "failed to kill JIT helper process group");
436    }
437    if let Err(err) = child.kill() {
438        warn!(%err, "failed to kill JIT helper");
439    }
440}
441
442fn signal_helper(child: &Child, signal: libc::c_int, action: &str) {
443    let pid = child.id() as libc::pid_t;
444    if unsafe { libc::kill(-pid, signal) } == 0 {
445        return;
446    }
447
448    let err = std::io::Error::last_os_error();
449    if err.raw_os_error() != Some(libc::ESRCH) {
450        warn!(%err, signal, action, "failed to signal JIT helper process group");
451    }
452}
453
454fn set_rlimit(resource: libc::c_int, value: u64) -> std::io::Result<()> {
455    let value = libc::rlim_t::try_from(value).unwrap_or(libc::rlim_t::MAX);
456    let limit = libc::rlimit { rlim_cur: value, rlim_max: value };
457    if unsafe { libc::setrlimit(resource as _, &limit) } != 0 {
458        return Err(std::io::Error::last_os_error());
459    }
460    Ok(())
461}
462
463#[cfg(any(target_os = "linux", target_os = "android"))]
464fn limit_cpu_affinity(cpu_count: usize) -> std::io::Result<()> {
465    let mut current = unsafe { std::mem::zeroed::<libc::cpu_set_t>() };
466    let size = std::mem::size_of::<libc::cpu_set_t>();
467    if unsafe { libc::sched_getaffinity(0, size, &mut current) } != 0 {
468        return Err(std::io::Error::last_os_error());
469    }
470
471    let mut limited = unsafe { std::mem::zeroed::<libc::cpu_set_t>() };
472    let mut remaining = cpu_count;
473    for cpu in 0..(8 * size) {
474        if unsafe { libc::CPU_ISSET(cpu, &current) } {
475            unsafe { libc::CPU_SET(cpu, &mut limited) };
476            remaining -= 1;
477            if remaining == 0 {
478                break;
479            }
480        }
481    }
482    if remaining == cpu_count {
483        return Err(std::io::Error::new(
484            std::io::ErrorKind::InvalidInput,
485            "current CPU affinity mask is empty",
486        ));
487    }
488
489    if unsafe { libc::sched_setaffinity(0, size, &limited) } != 0 {
490        return Err(std::io::Error::last_os_error());
491    }
492    Ok(())
493}
494
495#[cfg(not(any(target_os = "linux", target_os = "android")))]
496fn limit_cpu_affinity(_cpu_count: usize) -> std::io::Result<()> {
497    Ok(())
498}
499
500#[derive(Clone, PartialEq, Eq, SchemaWrite, SchemaRead)]
501struct HelperInit {
502    debug_assertions: bool,
503    single_error: bool,
504    no_dedup: bool,
505    no_dse: bool,
506    dump_dir: Option<String>,
507    gas_params: Option<GasParamPairs>,
508    jit_worker_count: usize,
509    compiler_recycle_threshold: usize,
510}
511
512#[derive(SchemaWrite, SchemaRead)]
513enum HelperRequest {
514    Init(HelperInit),
515    Compile(HelperCompile),
516    Pause { id: u64 },
517    Resume,
518}
519
520#[derive(SchemaWrite, SchemaRead)]
521struct HelperCompile {
522    id: u64,
523    code_hash: [u8; 32],
524    spec_id: u8,
525    opt_level: u8,
526    symbol_name: String,
527    bytecode: Vec<u8>,
528}
529
530#[derive(SchemaWrite, SchemaRead)]
531enum HelperResponse {
532    Ok {
533        id: u64,
534        symbol_name: String,
535        object_bytes: Vec<u8>,
536        builtin_symbols: Vec<String>,
537        timings: HelperTimings,
538    },
539    Err {
540        id: u64,
541        error: String,
542        timings: HelperTimings,
543    },
544    Paused {
545        id: u64,
546    },
547}
548
549#[derive(Clone, Copy, Default, SchemaWrite, SchemaRead)]
550struct HelperTimings {
551    parse: u64,
552    translate: u64,
553    optimize: u64,
554    codegen: u64,
555}
556
557fn write_init<W: Write + ?Sized>(w: &mut BufWriter<W>, init: &HelperInit) -> std::io::Result<()> {
558    write_message(w, &HelperRequest::Init(init.clone()))
559}
560
561fn write_job<W: Write + ?Sized>(
562    w: &mut BufWriter<W>,
563    id: u64,
564    job: &CompileJob,
565) -> std::io::Result<()> {
566    let req = HelperRequest::Compile(HelperCompile {
567        id,
568        code_hash: job.key.code_hash.0,
569        spec_id: job.key.spec_id as u8,
570        opt_level: opt_level_to_u8(job.opt_level),
571        symbol_name: job.symbol_name.clone(),
572        bytecode: job.bytecode.to_vec(),
573    });
574    write_message(w, &req)
575}
576
577fn write_pause<W: Write + ?Sized>(w: &mut BufWriter<W>, id: u64) -> std::io::Result<()> {
578    write_message(w, &HelperRequest::Pause { id })
579}
580
581fn write_resume<W: Write + ?Sized>(w: &mut BufWriter<W>) -> std::io::Result<()> {
582    write_message(w, &HelperRequest::Resume)
583}
584
585enum HelperResponseMessage {
586    Job(u64, HelperJobResult),
587    Paused(u64),
588}
589
590fn read_helper_response<R: Read + ?Sized>(
591    r: &mut BufReader<R>,
592) -> Result<HelperResponseMessage, String> {
593    match read_message(r).map_err(|e| format!("failed to decode helper result: {e}"))? {
594        HelperResponse::Ok { id, symbol_name, object_bytes, builtin_symbols, timings } => {
595            Ok(HelperResponseMessage::Job(
596                id,
597                HelperJobResult {
598                    outcome: Ok(WorkerSuccess::JitObject(JitObjectSuccess {
599                        symbol_name,
600                        object_bytes: Bytes::from(object_bytes),
601                        builtin_symbols,
602                    })),
603                    timings: timings.into(),
604                },
605            ))
606        }
607        HelperResponse::Err { id, error, timings } => Ok(HelperResponseMessage::Job(
608            id,
609            HelperJobResult { outcome: Err(error), timings: timings.into() },
610        )),
611        HelperResponse::Paused { id } => Ok(HelperResponseMessage::Paused(id)),
612    }
613}
614
615impl From<CompileTimings> for HelperTimings {
616    fn from(timings: CompileTimings) -> Self {
617        Self {
618            parse: duration_to_nanos(timings.parse),
619            translate: duration_to_nanos(timings.translate),
620            optimize: duration_to_nanos(timings.optimize),
621            codegen: duration_to_nanos(timings.codegen),
622        }
623    }
624}
625
626impl From<HelperTimings> for CompileTimings {
627    fn from(timings: HelperTimings) -> Self {
628        Self {
629            parse: Duration::from_nanos(timings.parse),
630            translate: Duration::from_nanos(timings.translate),
631            optimize: Duration::from_nanos(timings.optimize),
632            codegen: Duration::from_nanos(timings.codegen),
633        }
634    }
635}
636
637fn duration_to_nanos(duration: Duration) -> u64 {
638    u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
639}
640
641thread_local! {
642    static HELPER_COMPILER: RefCell<Option<CompilerState>> = const { RefCell::new(None) };
643}
644
645#[derive(Default)]
646struct HelperPauseState {
647    paused: Mutex<bool>,
648    resumed: Condvar,
649}
650
651impl HelperPauseState {
652    fn pause(&self) {
653        *self.paused.lock().unwrap() = true;
654    }
655
656    fn resume(&self) {
657        *self.paused.lock().unwrap() = false;
658        self.resumed.notify_all();
659    }
660
661    fn wait_resumed(&self) {
662        let mut paused = self.paused.lock().unwrap();
663        while *paused {
664            paused = self.resumed.wait(paused).unwrap();
665        }
666    }
667}
668
669fn run_jit_helper_stdio() -> eyre::Result<()> {
670    let mut stdin = BufReader::new(std::io::stdin().lock());
671    let config = Arc::new(read_helper_init(&mut stdin)?);
672    let worker_count = config.tuning.jit_worker_count.max(1);
673    let pool = ThreadPoolBuilder::new()
674        .num_threads(worker_count)
675        .thread_name(|i| format!("revmc-helper-{i:02}"))
676        .exit_handler(|_| {
677            HELPER_COMPILER.with_borrow_mut(Option::take);
678        })
679        .build()?;
680    let stdout = Arc::new(Mutex::new(BufWriter::new(std::io::stdout())));
681    let pause_state = Arc::new(HelperPauseState::default());
682
683    loop {
684        let request = match read_helper_request(&mut stdin) {
685            Ok(request) => request,
686            Err(err) if is_unexpected_eof(&err) => break,
687            Err(err) => return Err(err),
688        };
689        match request {
690            HelperWork::Compile { id, job } => {
691                let config = Arc::clone(&config);
692                let stdout = Arc::clone(&stdout);
693                let pause_state = Arc::clone(&pause_state);
694                pool.spawn_fifo(move || {
695                    let result = HELPER_COMPILER.with_borrow_mut(|compiler| {
696                        compile_with_state(job, &config, CompilerTarget::JitObject, compiler)
697                    });
698                    pause_state.wait_resumed();
699                    let mut stdout = stdout.lock().unwrap();
700                    if let Err(err) = write_helper_result(&mut stdout, id, result)
701                        .and_then(|()| stdout.flush().map_err(Into::into))
702                    {
703                        error!(%err, "failed to write helper result");
704                        std::process::exit(1);
705                    }
706                });
707            }
708            HelperWork::Pause { id } => {
709                pause_state.pause();
710                let mut stdout = stdout.lock().unwrap();
711                write_pause_ack(&mut stdout, id)?;
712                stdout.flush()?;
713            }
714            HelperWork::Resume => {
715                pause_state.resume();
716            }
717        }
718    }
719
720    Ok(())
721}
722
723fn read_helper_init<R: Read + ?Sized>(stdin: &mut BufReader<R>) -> eyre::Result<RuntimeConfig> {
724    match read_message(stdin)? {
725        HelperRequest::Init(init) => runtime_config_from_init(init),
726        HelperRequest::Compile(_) | HelperRequest::Pause { .. } | HelperRequest::Resume => {
727            eyre::bail!("JIT helper received request before init")
728        }
729    }
730}
731
732enum HelperWork {
733    Compile { id: u64, job: CompileJob },
734    Pause { id: u64 },
735    Resume,
736}
737
738fn read_helper_request<R: Read + ?Sized>(stdin: &mut BufReader<R>) -> eyre::Result<HelperWork> {
739    let req = match read_message(stdin)? {
740        HelperRequest::Compile(req) => req,
741        HelperRequest::Pause { id } => return Ok(HelperWork::Pause { id }),
742        HelperRequest::Resume => return Ok(HelperWork::Resume),
743        HelperRequest::Init(_) => eyre::bail!("JIT helper received duplicate init"),
744    };
745    let spec_id = SpecId::try_from_u8(req.spec_id).ok_or_else(|| eyre::eyre!("invalid spec id"))?;
746    let opt_level = opt_level_from_u8(req.opt_level)?;
747
748    let job = CompileJob {
749        kind: CompilationKind::Jit,
750        key: RuntimeCacheKey { code_hash: B256::from(req.code_hash), spec_id },
751        bytecode: Bytes::from(req.bytecode),
752        symbol_name: req.symbol_name,
753        opt_level,
754        sync_notifier: SyncNotifier::none(),
755        generation: 0,
756    };
757    Ok(HelperWork::Compile { id: req.id, job })
758}
759
760fn write_helper_result<W: Write + ?Sized>(
761    stdout: &mut BufWriter<W>,
762    id: u64,
763    result: WorkerResult,
764) -> eyre::Result<()> {
765    let timings = result.timings.into();
766    let response = match result.outcome {
767        Ok(WorkerSuccess::JitObject(success)) => HelperResponse::Ok {
768            id,
769            symbol_name: success.symbol_name,
770            object_bytes: success.object_bytes.to_vec(),
771            builtin_symbols: success.builtin_symbols,
772            timings,
773        },
774        Ok(_) => unreachable!(),
775        Err(error) => HelperResponse::Err { id, error, timings },
776    };
777    write_message(stdout, &response)?;
778    Ok(())
779}
780
781fn write_pause_ack<W: Write + ?Sized>(stdout: &mut BufWriter<W>, id: u64) -> eyre::Result<()> {
782    write_message(stdout, &HelperResponse::Paused { id })?;
783    Ok(())
784}
785
786fn write_message<T, W: Write + ?Sized>(w: &mut BufWriter<W>, message: &T) -> std::io::Result<()>
787where
788    T: wincode::SchemaWrite<wincode::config::DefaultConfig, Src = T>,
789{
790    wincode::serialize_into(w, message)
791        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
792}
793
794fn read_message<T, R: Read + ?Sized>(r: &mut BufReader<R>) -> std::io::Result<T>
795where
796    T: wincode::SchemaReadOwned<wincode::config::DefaultConfig, Dst = T>,
797{
798    wincode::deserialize_from(r).map_err(|e| std::io::Error::new(read_error_kind(&e), e))
799}
800
801fn read_error_kind(err: &wincode::ReadError) -> std::io::ErrorKind {
802    match err {
803        wincode::ReadError::Io(wincode::io::ReadError::ReadSizeLimit(_)) => {
804            std::io::ErrorKind::UnexpectedEof
805        }
806        _ => std::io::ErrorKind::InvalidData,
807    }
808}
809
810fn is_unexpected_eof(err: &eyre::Report) -> bool {
811    err.downcast_ref::<std::io::Error>()
812        .is_some_and(|err| err.kind() == std::io::ErrorKind::UnexpectedEof)
813}
814
815fn opt_level_to_u8(level: OptimizationLevel) -> u8 {
816    match level {
817        OptimizationLevel::None => 0,
818        OptimizationLevel::Less => 1,
819        OptimizationLevel::Default => 2,
820        OptimizationLevel::Aggressive => 3,
821    }
822}
823
824fn opt_level_from_u8(level: u8) -> eyre::Result<OptimizationLevel> {
825    Ok(match level {
826        0 => OptimizationLevel::None,
827        1 => OptimizationLevel::Less,
828        2 => OptimizationLevel::Default,
829        3 => OptimizationLevel::Aggressive,
830        _ => eyre::bail!("invalid optimization level"),
831    })
832}
833
834fn gas_params_to_pairs(gas_params: &GasParams) -> GasParamPairs {
835    (0..GAS_PARAM_COUNT)
836        .map(|i| {
837            let id = i as u8;
838            (id, gas_params.get(GasId::new(id)))
839        })
840        .collect()
841}
842
843fn gas_params_from_pairs(pairs: GasParamPairs) -> eyre::Result<GasParams> {
844    if pairs.len() != GAS_PARAM_COUNT {
845        eyre::bail!("invalid gas params length: {}", pairs.len());
846    }
847
848    let mut table = [0; GAS_PARAM_COUNT];
849    let mut seen = [false; GAS_PARAM_COUNT];
850    for (id, value) in pairs {
851        let index = usize::from(id);
852        if seen[index] {
853            eyre::bail!("duplicate gas param id: {id}");
854        }
855        seen[index] = true;
856        table[index] = value;
857    }
858    if let Some((id, _)) = seen.iter().enumerate().find(|(_, seen)| !**seen) {
859        eyre::bail!("missing gas param id: {id}");
860    }
861
862    Ok(GasParams::new(Arc::new(table)))
863}
864
865fn helper_init(config: &RuntimeConfig) -> HelperInit {
866    HelperInit {
867        debug_assertions: config.debug_assertions,
868        single_error: config.single_error,
869        no_dedup: config.no_dedup,
870        no_dse: config.no_dse,
871        dump_dir: config.dump_dir.as_ref().map(|path| path.to_string_lossy().into_owned()),
872        gas_params: config.gas_params.as_ref().map(gas_params_to_pairs),
873        jit_worker_count: config.tuning.jit_worker_count,
874        compiler_recycle_threshold: config.tuning.compiler_recycle_threshold,
875    }
876}
877
878fn runtime_config_from_init(init: HelperInit) -> eyre::Result<RuntimeConfig> {
879    let mut config = RuntimeConfig {
880        dump_dir: init.dump_dir.map(PathBuf::from),
881        debug_assertions: init.debug_assertions,
882        single_error: init.single_error,
883        no_dedup: init.no_dedup,
884        no_dse: init.no_dse,
885        gas_params: init.gas_params.map(gas_params_from_pairs).transpose()?,
886        ..Default::default()
887    };
888    config.tuning.jit_worker_count = init.jit_worker_count;
889    config.tuning.compiler_recycle_threshold = init.compiler_recycle_threshold;
890    Ok(config)
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896
897    #[test]
898    fn helper_pause_state_blocks_until_resume() {
899        let pause_state = Arc::new(HelperPauseState::default());
900        pause_state.pause();
901
902        let (tx, rx) = chan::bounded(1);
903        let thread_pause_state = Arc::clone(&pause_state);
904        let thread = std::thread::spawn(move || {
905            thread_pause_state.wait_resumed();
906            tx.send(()).unwrap();
907        });
908
909        assert!(rx.recv_timeout(Duration::from_millis(50)).is_err());
910        pause_state.resume();
911        rx.recv_timeout(Duration::from_secs(1)).unwrap();
912        thread.join().unwrap();
913    }
914
915    #[test]
916    fn helper_pause_protocol_roundtrips() {
917        let mut request = Vec::new();
918        {
919            let mut writer = BufWriter::new(&mut request);
920            write_pause(&mut writer, 42).unwrap();
921            writer.flush().unwrap();
922        }
923        let mut reader = BufReader::new(request.as_slice());
924        match read_helper_request(&mut reader).unwrap() {
925            HelperWork::Pause { id } => assert_eq!(id, 42),
926            HelperWork::Compile { .. } | HelperWork::Resume => panic!("unexpected helper request"),
927        }
928
929        let mut response = Vec::new();
930        {
931            let mut writer = BufWriter::new(&mut response);
932            write_pause_ack(&mut writer, 42).unwrap();
933            writer.flush().unwrap();
934        }
935        let mut reader = BufReader::new(response.as_slice());
936        match read_helper_response(&mut reader).unwrap() {
937            HelperResponseMessage::Paused(id) => assert_eq!(id, 42),
938            HelperResponseMessage::Job(..) => panic!("unexpected helper response"),
939        }
940    }
941
942    #[test]
943    fn helper_pause_resume_requests_update_stats() {
944        let stats = Arc::new(RuntimeStats::default());
945        let helper = HelperProcess::new(Arc::clone(&stats));
946
947        helper.pause();
948        helper.pause();
949        helper.resume();
950
951        let snapshot = stats.snapshot(crate::runtime::stats::RuntimeStatsGauges::default());
952        assert_eq!(snapshot.jit_helper_pause_requests, 2);
953        assert_eq!(snapshot.jit_helper_pause_acknowledgements, 0);
954        assert_eq!(snapshot.jit_helper_pause_failures, 0);
955        assert_eq!(snapshot.jit_helper_pause_timeouts, 0);
956        assert_eq!(snapshot.jit_helper_resume_requests, 1);
957        assert_eq!(snapshot.jit_helper_resume_failures, 0);
958    }
959}