mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
12
Lib/test/test_signal.py
vendored
12
Lib/test/test_signal.py
vendored
@@ -127,8 +127,6 @@ class PosixTests(unittest.TestCase):
|
||||
self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
|
||||
self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
# Issue 3864, unknown if this affects earlier versions of freebsd also
|
||||
def test_interprocess_signal(self):
|
||||
dirname = os.path.dirname(__file__)
|
||||
@@ -820,8 +818,6 @@ class ItimerTest(unittest.TestCase):
|
||||
self.hndl_called = True
|
||||
signal.setitimer(signal.ITIMER_PROF, 0)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_itimer_exc(self):
|
||||
# XXX I'm assuming -1 is an invalid itimer, but maybe some platform
|
||||
# defines it ?
|
||||
@@ -831,16 +827,12 @@ class ItimerTest(unittest.TestCase):
|
||||
self.assertRaises(signal.ItimerError,
|
||||
signal.setitimer, signal.ITIMER_REAL, -1)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_itimer_real(self):
|
||||
self.itimer = signal.ITIMER_REAL
|
||||
signal.setitimer(self.itimer, 1.0)
|
||||
signal.pause()
|
||||
self.assertEqual(self.hndl_called, True)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
# Issue 3864, unknown if this affects earlier versions of freebsd also
|
||||
@unittest.skipIf(sys.platform in ('netbsd5',),
|
||||
'itimer not reliable (does not mix well with threading) on some BSDs.')
|
||||
@@ -861,8 +853,6 @@ class ItimerTest(unittest.TestCase):
|
||||
# and the handler should have been called
|
||||
self.assertEqual(self.hndl_called, True)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_itimer_prof(self):
|
||||
self.itimer = signal.ITIMER_PROF
|
||||
signal.signal(signal.SIGPROF, self.sig_prof)
|
||||
@@ -880,8 +870,6 @@ class ItimerTest(unittest.TestCase):
|
||||
# and the handler should have been called
|
||||
self.assertEqual(self.hndl_called, True)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_setitimer_tiny(self):
|
||||
# bpo-30807: C setitimer() takes a microsecond-resolution interval.
|
||||
# Check that float -> timeval conversion doesn't round
|
||||
|
||||
@@ -1738,9 +1738,17 @@ pub mod module {
|
||||
#[pyfunction]
|
||||
fn waitpid(pid: libc::pid_t, opt: i32, vm: &VirtualMachine) -> PyResult<(libc::pid_t, i32)> {
|
||||
let mut status = 0;
|
||||
let pid = unsafe { libc::waitpid(pid, &mut status, opt) };
|
||||
let pid = nix::Error::result(pid).map_err(|err| err.into_pyexception(vm))?;
|
||||
Ok((pid, status))
|
||||
loop {
|
||||
let res = unsafe { libc::waitpid(pid, &mut status, opt) };
|
||||
if res == -1 {
|
||||
if nix::Error::last_raw() == libc::EINTR {
|
||||
vm.check_signals()?;
|
||||
continue;
|
||||
}
|
||||
return Err(nix::Error::last().into_pyexception(vm));
|
||||
}
|
||||
return Ok((res, status));
|
||||
}
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
|
||||
@@ -19,6 +19,11 @@ pub(crate) mod _signal {
|
||||
convert::{IntoPyException, TryFromBorrowedObject},
|
||||
};
|
||||
use crate::{PyObjectRef, PyResult, VirtualMachine, signal};
|
||||
#[cfg(unix)]
|
||||
use crate::{
|
||||
builtins::PyTypeRef,
|
||||
function::{ArgIntoFloat, OptionalArg},
|
||||
};
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
@@ -89,6 +94,18 @@ pub(crate) mod _signal {
|
||||
fn siginterrupt(sig: i32, flag: i32) -> i32;
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
mod ffi {
|
||||
unsafe extern "C" {
|
||||
pub fn getitimer(which: libc::c_int, curr_value: *mut libc::itimerval) -> libc::c_int;
|
||||
pub fn setitimer(
|
||||
which: libc::c_int,
|
||||
new_value: *const libc::itimerval,
|
||||
old_value: *mut libc::itimerval,
|
||||
) -> libc::c_int;
|
||||
}
|
||||
}
|
||||
|
||||
#[pyattr]
|
||||
use crate::signal::NSIG;
|
||||
|
||||
@@ -114,6 +131,31 @@ pub(crate) mod _signal {
|
||||
#[pyattr]
|
||||
use libc::{SIGPWR, SIGSTKFLT};
|
||||
|
||||
// Interval timer constants
|
||||
#[cfg(all(unix, not(target_os = "android")))]
|
||||
#[pyattr]
|
||||
use libc::{ITIMER_PROF, ITIMER_REAL, ITIMER_VIRTUAL};
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
#[pyattr]
|
||||
const ITIMER_REAL: libc::c_int = 0;
|
||||
#[cfg(target_os = "android")]
|
||||
#[pyattr]
|
||||
const ITIMER_VIRTUAL: libc::c_int = 1;
|
||||
#[cfg(target_os = "android")]
|
||||
#[pyattr]
|
||||
const ITIMER_PROF: libc::c_int = 2;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[pyattr(name = "ItimerError", once)]
|
||||
fn itimer_error(vm: &VirtualMachine) -> PyTypeRef {
|
||||
vm.ctx.new_exception_type(
|
||||
"signal",
|
||||
"ItimerError",
|
||||
Some(vec![vm.ctx.exceptions.os_error.to_owned()]),
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(any(unix, windows))]
|
||||
pub(super) fn init_signal_handlers(
|
||||
module: &Py<crate::builtins::PyModule>,
|
||||
@@ -216,6 +258,80 @@ pub(crate) mod _signal {
|
||||
prev_time.unwrap_or(0)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[pyfunction]
|
||||
fn pause(vm: &VirtualMachine) -> PyResult<()> {
|
||||
unsafe { libc::pause() };
|
||||
signal::check_signals(vm)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn timeval_to_double(tv: &libc::timeval) -> f64 {
|
||||
tv.tv_sec as f64 + (tv.tv_usec as f64 / 1_000_000.0)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn double_to_timeval(val: f64) -> libc::timeval {
|
||||
libc::timeval {
|
||||
tv_sec: val.trunc() as _,
|
||||
tv_usec: ((val.fract()) * 1_000_000.0) as _,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn itimerval_to_tuple(it: &libc::itimerval) -> (f64, f64) {
|
||||
(
|
||||
timeval_to_double(&it.it_value),
|
||||
timeval_to_double(&it.it_interval),
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[pyfunction]
|
||||
fn setitimer(
|
||||
which: i32,
|
||||
seconds: ArgIntoFloat,
|
||||
interval: OptionalArg<ArgIntoFloat>,
|
||||
vm: &VirtualMachine,
|
||||
) -> PyResult<(f64, f64)> {
|
||||
let seconds: f64 = seconds.into();
|
||||
let interval: f64 = interval.map(|v| v.into()).unwrap_or(0.0);
|
||||
let new = libc::itimerval {
|
||||
it_value: double_to_timeval(seconds),
|
||||
it_interval: double_to_timeval(interval),
|
||||
};
|
||||
let mut old = std::mem::MaybeUninit::<libc::itimerval>::uninit();
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
let ret = unsafe { ffi::setitimer(which, &new, old.as_mut_ptr()) };
|
||||
#[cfg(not(any(target_os = "linux", target_os = "android")))]
|
||||
let ret = unsafe { libc::setitimer(which, &new, old.as_mut_ptr()) };
|
||||
if ret != 0 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
let itimer_error = itimer_error(vm);
|
||||
return Err(vm.new_exception_msg(itimer_error, err.to_string()));
|
||||
}
|
||||
let old = unsafe { old.assume_init() };
|
||||
Ok(itimerval_to_tuple(&old))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[pyfunction]
|
||||
fn getitimer(which: i32, vm: &VirtualMachine) -> PyResult<(f64, f64)> {
|
||||
let mut old = std::mem::MaybeUninit::<libc::itimerval>::uninit();
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
let ret = unsafe { ffi::getitimer(which, old.as_mut_ptr()) };
|
||||
#[cfg(not(any(target_os = "linux", target_os = "android")))]
|
||||
let ret = unsafe { libc::getitimer(which, old.as_mut_ptr()) };
|
||||
if ret != 0 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
let itimer_error = itimer_error(vm);
|
||||
return Err(vm.new_exception_msg(itimer_error, err.to_string()));
|
||||
}
|
||||
let old = unsafe { old.assume_init() };
|
||||
Ok(itimerval_to_tuple(&old))
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn default_int_handler(
|
||||
_signum: PyObjectRef,
|
||||
|
||||
Reference in New Issue
Block a user