mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-09 22:49:57 +09:00
24
Lib/test/test_time.py
vendored
24
Lib/test/test_time.py
vendored
@@ -60,16 +60,16 @@ class TimeTestCase(unittest.TestCase):
|
||||
time.timezone
|
||||
time.tzname
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'get_clock_info'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement get_clock_info for Windows.")
|
||||
def test_time(self):
|
||||
time.time()
|
||||
info = time.get_clock_info('time')
|
||||
self.assertFalse(info.monotonic)
|
||||
self.assertTrue(info.adjustable)
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'monotonic_ns'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement monotonic_ns for Windows.")
|
||||
def test_time_ns_type(self):
|
||||
def check_ns(sec, ns):
|
||||
self.assertIsInstance(ns, int)
|
||||
@@ -476,8 +476,8 @@ class TimeTestCase(unittest.TestCase):
|
||||
pass
|
||||
self.assertEqual(time.strftime('%Z', tt), tzname)
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'get_clock_info'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement get_clock_info for Windows.")
|
||||
def test_monotonic(self):
|
||||
# monotonic() should not go backward
|
||||
times = [time.monotonic() for n in range(100)]
|
||||
@@ -504,8 +504,8 @@ class TimeTestCase(unittest.TestCase):
|
||||
def test_perf_counter(self):
|
||||
time.perf_counter()
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'get_clock_info'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement get_clock_info for Windows.")
|
||||
def test_process_time(self):
|
||||
# process_time() should not include time spend during a sleep
|
||||
start = time.process_time()
|
||||
@@ -519,8 +519,8 @@ class TimeTestCase(unittest.TestCase):
|
||||
self.assertTrue(info.monotonic)
|
||||
self.assertFalse(info.adjustable)
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'get_clock_info'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement get_clock_info for Windows.")
|
||||
def test_thread_time(self):
|
||||
if not hasattr(time, 'thread_time'):
|
||||
if sys.platform.startswith(('linux', 'win')):
|
||||
@@ -577,8 +577,8 @@ class TimeTestCase(unittest.TestCase):
|
||||
self.assertRaises(ValueError, time.localtime, float("nan"))
|
||||
self.assertRaises(ValueError, time.ctime, float("nan"))
|
||||
|
||||
# TODO: RUSTPYTHON, AttributeError: module 'time' has no attribute 'get_clock_info'
|
||||
@unittest.expectedFailure
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform == "win32", "Implement get_clock_info for Windows.")
|
||||
def test_get_clock_info(self):
|
||||
clocks = ['monotonic', 'perf_counter', 'process_time', 'time']
|
||||
|
||||
|
||||
@@ -90,12 +90,34 @@ mod time {
|
||||
Ok(Date::now() / 1000.0)
|
||||
}
|
||||
|
||||
#[cfg(any(windows, target_os = "wasi"))]
|
||||
#[pyfunction]
|
||||
fn monotonic(vm: &VirtualMachine) -> PyResult<f64> {
|
||||
// TODO: implement proper monotonic time!
|
||||
// TODO: implement proper monotonic time for other platforms.
|
||||
Ok(duration_since_system_now(vm)?.as_secs_f64())
|
||||
}
|
||||
|
||||
#[cfg(any(windows, target_os = "wasi"))]
|
||||
#[pyfunction]
|
||||
fn monotonic_ns(vm: &VirtualMachine) -> PyResult<u128> {
|
||||
// TODO: implement proper monotonic time for other platforms.
|
||||
Ok(duration_since_system_now(vm)?.as_nanos())
|
||||
}
|
||||
|
||||
#[cfg(any(windows, target_os = "wasi"))]
|
||||
#[pyfunction]
|
||||
fn perf_counter(vm: &VirtualMachine) -> PyResult<f64> {
|
||||
// TODO: implement proper monotonic time for other platforms.
|
||||
Ok(duration_since_system_now(vm)?.as_secs_f64())
|
||||
}
|
||||
|
||||
#[cfg(any(windows, target_os = "wasi"))]
|
||||
#[pyfunction]
|
||||
fn perf_counter_ns(vm: &VirtualMachine) -> PyResult<u128> {
|
||||
// TODO: implement proper monotonic time for other platforms.
|
||||
Ok(duration_since_system_now(vm)?.as_nanos())
|
||||
}
|
||||
|
||||
fn pyobj_to_naive_date_time(
|
||||
value: Either<f64, i64>,
|
||||
vm: &VirtualMachine,
|
||||
@@ -205,8 +227,8 @@ mod time {
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "linux",
|
||||
target_os = "openbsd",
|
||||
target_os = "solaris",
|
||||
target_os = "fuchsia",
|
||||
target_os = "emscripten",
|
||||
)))]
|
||||
fn get_thread_time(vm: &VirtualMachine) -> PyResult<std::time::Duration> {
|
||||
Err(vm.new_not_implemented_error("thread time unsupported in this system".to_owned()))
|
||||
@@ -372,23 +394,205 @@ mod time {
|
||||
#[cfg(unix)]
|
||||
#[pymodule(name = "time")]
|
||||
mod unix {
|
||||
use crate::{PyResult, VirtualMachine};
|
||||
#[allow(unused_imports)]
|
||||
use super::{SEC_TO_NS, US_TO_NS};
|
||||
#[cfg_attr(target_os = "macos", allow(unused_imports))]
|
||||
use crate::{
|
||||
builtins::{try_bigint_to_f64, PyFloat, PyIntRef, PyNamespace, PyStrRef},
|
||||
stdlib::os,
|
||||
utils::Either,
|
||||
PyRef, PyResult, VirtualMachine,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(target_os = "solaris")]
|
||||
#[pyattr]
|
||||
use libc::CLOCK_HIGHRES;
|
||||
#[cfg(not(any(
|
||||
target_os = "illumos",
|
||||
target_os = "netbsd",
|
||||
target_os = "solaris",
|
||||
target_os = "openbsd",
|
||||
)))]
|
||||
#[pyattr]
|
||||
use libc::CLOCK_PROCESS_CPUTIME_ID;
|
||||
#[cfg(not(any(
|
||||
target_os = "illumos",
|
||||
target_os = "netbsd",
|
||||
target_os = "solaris",
|
||||
target_os = "openbsd",
|
||||
target_os = "redox",
|
||||
)))]
|
||||
#[pyattr]
|
||||
use libc::CLOCK_THREAD_CPUTIME_ID;
|
||||
#[cfg(target_os = "linux")]
|
||||
#[pyattr]
|
||||
use libc::{CLOCK_BOOTTIME, CLOCK_MONOTONIC_RAW, CLOCK_TAI};
|
||||
#[pyattr]
|
||||
use libc::{
|
||||
CLOCK_MONOTONIC, CLOCK_PROCESS_CPUTIME_ID, CLOCK_REALTIME, CLOCK_THREAD_CPUTIME_ID,
|
||||
};
|
||||
#[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
|
||||
use libc::{CLOCK_MONOTONIC, CLOCK_REALTIME};
|
||||
#[cfg(any(target_os = "freebsd", target_os = "openbsd", target_os = "dragonfly"))]
|
||||
#[pyattr]
|
||||
use libc::{CLOCK_PROF, CLOCK_UPTIME};
|
||||
|
||||
fn get_clock_time(clk_id: PyIntRef, vm: &VirtualMachine) -> PyResult<Duration> {
|
||||
let mut timespec = std::mem::MaybeUninit::uninit();
|
||||
let ts: libc::timespec = unsafe {
|
||||
if libc::clock_gettime(clk_id.try_to_primitive(vm)?, timespec.as_mut_ptr()) == -1 {
|
||||
return Err(os::errno_err(vm));
|
||||
}
|
||||
timespec.assume_init()
|
||||
};
|
||||
Ok(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32))
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn clock_gettime(clk_id: PyIntRef, vm: &VirtualMachine) -> PyResult<f64> {
|
||||
get_clock_time(clk_id, vm).map(|d| d.as_secs_f64())
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn clock_gettime_ns(clk_id: PyIntRef, vm: &VirtualMachine) -> PyResult<u128> {
|
||||
get_clock_time(clk_id, vm).map(|d| d.as_nanos())
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "redox"))]
|
||||
#[pyfunction]
|
||||
fn clock_getres(clk_id: PyIntRef, vm: &VirtualMachine) -> PyResult<f64> {
|
||||
let mut timespec = std::mem::MaybeUninit::uninit();
|
||||
let ts: libc::timespec = unsafe {
|
||||
if libc::clock_getres(clk_id.try_to_primitive(vm)?, timespec.as_mut_ptr()) == -1 {
|
||||
return Err(os::errno_err(vm));
|
||||
}
|
||||
timespec.assume_init()
|
||||
};
|
||||
Ok(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32).as_secs_f64())
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "redox")))]
|
||||
fn set_clock_time(
|
||||
clk_id: PyIntRef,
|
||||
timespec: libc::timespec,
|
||||
vm: &VirtualMachine,
|
||||
) -> PyResult<()> {
|
||||
let res = unsafe { libc::clock_settime(clk_id.try_to_primitive(vm)?, ×pec) };
|
||||
if res == -1 {
|
||||
return Err(os::errno_err(vm));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "redox")))]
|
||||
#[pyfunction]
|
||||
fn clock_settime(
|
||||
clk_id: PyIntRef,
|
||||
time: Either<PyRef<PyFloat>, PyIntRef>,
|
||||
vm: &VirtualMachine,
|
||||
) -> PyResult<()> {
|
||||
let time = match time {
|
||||
Either::A(f) => f.to_f64(),
|
||||
Either::B(z) => try_bigint_to_f64(z.as_bigint(), vm)?,
|
||||
};
|
||||
let nanos = time.fract() * (SEC_TO_NS as f64);
|
||||
let ts = libc::timespec {
|
||||
tv_sec: time.floor() as libc::time_t,
|
||||
tv_nsec: nanos as _,
|
||||
};
|
||||
set_clock_time(clk_id, ts, vm)
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "redox")))]
|
||||
#[pyfunction]
|
||||
fn clock_settime_ns(clk_id: PyIntRef, time: PyIntRef, vm: &VirtualMachine) -> PyResult<()> {
|
||||
let time: libc::time_t = time.try_to_primitive(vm)?;
|
||||
let ts = libc::timespec {
|
||||
tv_sec: time / (SEC_TO_NS as libc::time_t),
|
||||
tv_nsec: time.rem_euclid(SEC_TO_NS as libc::time_t) as _,
|
||||
};
|
||||
set_clock_time(clk_id, ts, vm)
|
||||
}
|
||||
|
||||
// Requires all CLOCK constants available and clock_getres
|
||||
#[cfg(any(
|
||||
target_os = "macos",
|
||||
target_os = "android",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "fuchsia",
|
||||
target_os = "emscripten",
|
||||
target_os = "linux",
|
||||
))]
|
||||
#[pyfunction]
|
||||
fn get_clock_info(name: PyStrRef, vm: &VirtualMachine) -> PyResult<PyRef<PyNamespace>> {
|
||||
let (adj, imp, mono, res) = match name.as_ref() {
|
||||
"monotonic" | "perf_counter" => (
|
||||
false,
|
||||
"time.clock_gettime(CLOCK_MONOTONIC)",
|
||||
true,
|
||||
clock_getres(vm.ctx.new_int(CLOCK_MONOTONIC), vm)?,
|
||||
),
|
||||
"process_time" => (
|
||||
false,
|
||||
"time.clock_gettime(CLOCK_PROCESS_CPUTIME_ID)",
|
||||
true,
|
||||
clock_getres(vm.ctx.new_int(CLOCK_PROCESS_CPUTIME_ID), vm)?,
|
||||
),
|
||||
"thread_time" => (
|
||||
false,
|
||||
"time.clock_gettime(CLOCK_THREAD_CPUTIME_ID)",
|
||||
true,
|
||||
clock_getres(vm.ctx.new_int(CLOCK_THREAD_CPUTIME_ID), vm)?,
|
||||
),
|
||||
"time" => (
|
||||
true,
|
||||
"time.clock_gettime(CLOCK_REALTIME)",
|
||||
false,
|
||||
clock_getres(vm.ctx.new_int(CLOCK_REALTIME), vm)?,
|
||||
),
|
||||
_ => return Err(vm.new_value_error("unknown clock".to_owned())),
|
||||
};
|
||||
|
||||
Ok(py_namespace!(vm, {
|
||||
"implementation" => vm.new_pyobj(imp),
|
||||
"monotonic" => vm.ctx.new_bool(mono),
|
||||
"adjustable" => vm.ctx.new_bool(adj),
|
||||
"resolution" => vm.ctx.new_float(res),
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(not(any(
|
||||
target_os = "macos",
|
||||
target_os = "android",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "fuchsia",
|
||||
target_os = "emscripten",
|
||||
target_os = "linux",
|
||||
)))]
|
||||
#[pyfunction]
|
||||
fn get_clock_info(_name: PyStrRef, vm: &VirtualMachine) -> PyResult<PyNamespace> {
|
||||
Err(vm.new_not_implemented_error("get_clock_info unsupported on this system".to_owned()))
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn monotonic(vm: &VirtualMachine) -> PyResult<f64> {
|
||||
clock_gettime(vm.ctx.new_int(CLOCK_MONOTONIC), vm)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn monotonic_ns(vm: &VirtualMachine) -> PyResult<u128> {
|
||||
clock_gettime_ns(vm.ctx.new_int(CLOCK_MONOTONIC), vm)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn perf_counter(vm: &VirtualMachine) -> PyResult<f64> {
|
||||
clock_gettime(vm.ctx.new_int(CLOCK_MONOTONIC), vm)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn perf_counter_ns(vm: &VirtualMachine) -> PyResult<u128> {
|
||||
clock_gettime_ns(vm.ctx.new_int(CLOCK_MONOTONIC), vm)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn sleep(dur: Duration, vm: &VirtualMachine) -> PyResult<()> {
|
||||
// this is basically std::thread::sleep, but that catches interrupts and we don't want to;
|
||||
@@ -407,18 +611,16 @@ mod unix {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "macos",
|
||||
target_os = "android",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "linux",
|
||||
#[cfg(not(any(
|
||||
target_os = "illumos",
|
||||
target_os = "netbsd",
|
||||
target_os = "openbsd",
|
||||
))]
|
||||
target_os = "redox"
|
||||
)))]
|
||||
pub(super) fn get_thread_time(vm: &VirtualMachine) -> PyResult<Duration> {
|
||||
let time: libc::timespec = unsafe {
|
||||
let mut time = std::mem::MaybeUninit::uninit();
|
||||
if libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, time.as_mut_ptr()) == -1 {
|
||||
if libc::clock_gettime(CLOCK_THREAD_CPUTIME_ID, time.as_mut_ptr()) == -1 {
|
||||
return Err(vm.new_os_error("Failed to get clock time".to_owned()));
|
||||
}
|
||||
time.assume_init()
|
||||
@@ -431,22 +633,14 @@ mod unix {
|
||||
Ok(Duration::from_nanos(unsafe { libc::gethrvtime() }))
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "macos",
|
||||
target_os = "android",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "linux",
|
||||
target_os = "redox",
|
||||
))]
|
||||
#[cfg(not(any(
|
||||
target_os = "illumos",
|
||||
target_os = "netbsd",
|
||||
target_os = "solaris",
|
||||
target_os = "openbsd",
|
||||
)))]
|
||||
pub(super) fn get_process_time(vm: &VirtualMachine) -> PyResult<Duration> {
|
||||
let time: libc::timespec = unsafe {
|
||||
#[cfg(not(target_os = "redox"))]
|
||||
use libc::CLOCK_PROCESS_CPUTIME_ID;
|
||||
#[cfg(target_os = "redox")]
|
||||
// TODO: will be upstreamed to libc sometime soon
|
||||
const CLOCK_PROCESS_CPUTIME_ID: libc::clockid_t = 2;
|
||||
|
||||
let mut time = std::mem::MaybeUninit::uninit();
|
||||
if libc::clock_gettime(CLOCK_PROCESS_CPUTIME_ID, time.as_mut_ptr()) == -1 {
|
||||
return Err(vm.new_os_error("Failed to get clock time".to_owned()));
|
||||
@@ -464,8 +658,6 @@ mod unix {
|
||||
))]
|
||||
pub(super) fn get_process_time(vm: &VirtualMachine) -> PyResult<Duration> {
|
||||
fn from_timeval(tv: libc::timeval, vm: &VirtualMachine) -> PyResult<i64> {
|
||||
use super::decl::{SEC_TO_NS, US_TO_NS};
|
||||
|
||||
(|tv: libc::timeval| {
|
||||
let t = tv.tv_sec.checked_mul(SEC_TO_NS)?;
|
||||
let u = (tv.tv_usec as i64).checked_mul(US_TO_NS)?;
|
||||
|
||||
Reference in New Issue
Block a user