impl more nt (#6984)

* mpl new features

* windows encodings

* impl nt functions

* revert

* codecs

* fix codecs
This commit is contained in:
Jeong, YunWon
2026-02-04 09:53:02 +09:00
committed by GitHub
parent 9f0cd323b6
commit c045593e4e
17 changed files with 1188 additions and 622 deletions

View File

@@ -65,9 +65,6 @@ class BufferSizeTest:
class CBufferSizeTest(BufferSizeTest, unittest.TestCase):
open = io.open
# TODO: RUSTPYTHON
import sys
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, can't cleanup temporary file on Windows")
class PyBufferSizeTest(BufferSizeTest, unittest.TestCase):
open = staticmethod(pyio.open)

View File

@@ -620,7 +620,6 @@ class CmdLineTest(unittest.TestCase):
# Confirm that the caret is located under the '=' sign
self.assertIn("\n ^^^^^\n", text)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_syntaxerror_indented_caret_position(self):
script = textwrap.dedent("""\
if True:

View File

@@ -3451,7 +3451,6 @@ class ExceptionNotesTest(unittest.TestCase):
class CodePageTest(unittest.TestCase):
CP_UTF8 = 65001
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_invalid_code_page(self):
self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
@@ -3670,7 +3669,6 @@ class CodePageTest(unittest.TestCase):
('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
))
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_code_page_decode_flags(self):
# Issue #36312: For some code pages (e.g. UTF-7) flags for
# MultiByteToWideChar() must be set to 0.

View File

@@ -722,7 +722,6 @@ class CommandLineTestsBase:
self.assertCompiled(spamfn)
self.assertCompiled(eggfn)
@unittest.skipIf(sys.platform == 'win32', 'TODO: RUSTPYTHON hangs')
@os_helper.skip_unless_symlink
def test_symlink_loop(self):
# Currently, compileall ignores symlinks to directories.

View File

@@ -209,7 +209,7 @@ class ProcessPoolForkWaitTest(ProcessPoolForkWaitTest): # TODO: RUSTPYTHON
def test_first_exception(self): super().test_first_exception() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky")
def test_first_exception_one_already_failed(self): super().test_first_exception_one_already_failed() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault")
@unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON flaky")
def test_first_exception_some_already_complete(self): super().test_first_exception_some_already_complete() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault")
def test_timeout(self): super().test_timeout() # TODO: RUSTPYTHON

View File

@@ -747,7 +747,6 @@ class ExceptionTests(unittest.TestCase):
x = DerivedException(fancy_arg=42)
self.assertEqual(x.fancy_arg, 42)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; Windows")
@no_tracing
def testInfiniteRecursion(self):
def f():
@@ -1415,7 +1414,6 @@ class ExceptionTests(unittest.TestCase):
exc = UnicodeTranslateError("x", 0, 1, Evil("reason"))
self.assertRaises(TypeError, str, exc)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; Windows")
@no_tracing
def test_badisinstance(self):
# Bug #2542: if issubclass(e, MyException) raises an exception,
@@ -1700,7 +1698,6 @@ class ExceptionTests(unittest.TestCase):
gc_collect() # For PyPy or other GCs.
self.assertEqual(wr(), None)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; Windows")
@no_tracing
def test_recursion_error_cleanup(self):
# Same test as above, but with "recursion exceeded" errors
@@ -1722,7 +1719,6 @@ class ExceptionTests(unittest.TestCase):
gc_collect() # For PyPy or other GCs.
self.assertEqual(wr(), None)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; error specific to cpython")
def test_errno_ENOTDIR(self):
# Issue #12802: "not a directory" errors are ENOTDIR even on Windows
with self.assertRaises(OSError) as cm:

3
Lib/test/test_os.py vendored
View File

@@ -3630,7 +3630,6 @@ class SpawnTests(unittest.TestCase):
exitcode = os.spawnl(os.P_WAIT, program, *args)
self.assertEqual(exitcode, self.exitcode)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; fix spawnve on Windows")
@requires_os_func('spawnle')
def test_spawnle(self):
program, args = self.create_args(with_env=True)
@@ -3659,7 +3658,6 @@ class SpawnTests(unittest.TestCase):
exitcode = os.spawnv(os.P_WAIT, FakePath(program), args)
self.assertEqual(exitcode, self.exitcode)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; fix spawnve on Windows")
@requires_os_func('spawnve')
def test_spawnve(self):
program, args = self.create_args(with_env=True)
@@ -3767,7 +3765,6 @@ class SpawnTests(unittest.TestCase):
exitcode = spawn(os.P_WAIT, program, args, newenv)
self.assertEqual(exitcode, 0)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; fix spawnve on Windows")
@requires_os_func('spawnve')
def test_spawnve_invalid_env(self):
self._test_invalid_env(os.spawnve)

View File

@@ -1074,7 +1074,6 @@ class PosixTester(unittest.TestCase):
self.check_chmod_link(posix.chmod, target, link)
self.check_chmod_link(posix.chmod, target, link, follow_symlinks=True)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; flaky")
@os_helper.skip_unless_symlink
def test_chmod_dir_symlink(self):
target = self.tempdir()

View File

@@ -680,7 +680,6 @@ class RunPathTestCase(unittest.TestCase, CodeExecutionMixin):
self._check_script(script_name, "<run_path>", script_name,
script_name, expect_spec=False)
@unittest.skipIf(sys.platform == 'win32', "TODO: RUSTPYTHON; weird panic in lz4-flex")
def test_script_compiled(self):
with temp_dir() as script_dir:
mod_name = 'script'

View File

@@ -82,7 +82,6 @@ class TestScriptHelperEnvironment(unittest.TestCase):
# Reset the private cached state.
script_helper.__dict__['__cached_interp_requires_environment'] = None
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
@mock.patch('subprocess.check_call')
def test_interpreter_requires_environment_true(self, mock_check_call):
with mock.patch.dict(os.environ):
@@ -92,7 +91,6 @@ class TestScriptHelperEnvironment(unittest.TestCase):
self.assertTrue(script_helper.interpreter_requires_environment())
self.assertEqual(1, mock_check_call.call_count)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
@mock.patch('subprocess.check_call')
def test_interpreter_requires_environment_false(self, mock_check_call):
with mock.patch.dict(os.environ):
@@ -102,7 +100,6 @@ class TestScriptHelperEnvironment(unittest.TestCase):
self.assertFalse(script_helper.interpreter_requires_environment())
self.assertEqual(1, mock_check_call.call_count)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
@mock.patch('subprocess.check_call')
def test_interpreter_requires_environment_details(self, mock_check_call):
with mock.patch.dict(os.environ):
@@ -115,7 +112,6 @@ class TestScriptHelperEnvironment(unittest.TestCase):
self.assertEqual(sys.executable, check_call_command[0])
self.assertIn('-E', check_call_command)
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
@mock.patch('subprocess.check_call')
def test_interpreter_requires_environment_with_pythonhome(self, mock_check_call):
with mock.patch.dict(os.environ):

View File

@@ -1902,7 +1902,7 @@ class RunFuncTestCase(BaseTestCase):
res = subprocess.run(args)
self.assertEqual(res.returncode, 57)
@unittest.skipIf(mswindows, 'TODO: RUSTPYTHON; Flakey')
@unittest.skipIf(mswindows, 'TODO: RUSTPYTHON; empty env block fails nondeterministically')
@unittest.skipUnless(mswindows, "Maybe test trigger a leak on Ubuntu")
def test_run_with_an_empty_env(self):
# gh-105436: fix subprocess.run(..., env={}) broken on Windows

View File

@@ -2253,7 +2253,6 @@ class FinalizeTestCase(unittest.TestCase):
assert f3.atexit == True
assert f4.atexit == True
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON; Windows")
def test_atexit(self):
prog = ('from test.test_weakref import FinalizeTestCase;'+
'FinalizeTestCase.run_in_child()')

View File

@@ -567,7 +567,6 @@ class TestPath(unittest.TestCase):
file = cls(alpharep).joinpath('some dir').parent
assert isinstance(file, cls)
@unittest.skipIf(sys.platform == 'win32', 'TODO: RUSTPYTHON; fails on Windows')
@parameterize(
['alpharep', 'path_type', 'subpath'],
itertools.product(

View File

@@ -210,8 +210,8 @@ impl VirtualMachine {
}
if let Some(text) = maybe_text {
// if text ends with \n, remove it
let r_text = text.as_str().trim_end_matches('\n');
// if text ends with \n or \r\n, remove it
let r_text = text.as_str().trim_end_matches(['\n', '\r']);
let l_text = r_text.trim_start_matches([' ', '\n', '\x0c']); // \x0c is \f
let spaces = (r_text.len() - l_text.len()) as isize;

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,7 @@ pub(crate) mod module {
use libc::intptr_t;
use std::os::windows::io::AsRawHandle;
use std::{env, fs, io, mem::MaybeUninit, os::windows::ffi::OsStringExt};
use std::{env, io, mem::MaybeUninit, os::windows::ffi::OsStringExt};
use windows_sys::Win32::{
Foundation::{self, INVALID_HANDLE_VALUE},
Storage::FileSystem,
@@ -124,8 +124,12 @@ pub(crate) mod module {
#[pyfunction]
pub(super) fn _supports_virtual_terminal() -> PyResult<bool> {
// TODO: implement this
Ok(true)
let mut mode = 0;
let handle = unsafe { Console::GetStdHandle(Console::STD_ERROR_HANDLE) };
if unsafe { Console::GetConsoleMode(handle, &mut mode) } == 0 {
return Ok(false);
}
Ok(mode & Console::ENABLE_VIRTUAL_TERMINAL_PROCESSING != 0)
}
#[derive(FromArgs)]
@@ -140,20 +144,78 @@ pub(crate) mod module {
#[pyfunction]
pub(super) fn symlink(args: SymlinkArgs<'_>, vm: &VirtualMachine) -> PyResult<()> {
use std::os::windows::fs as win_fs;
let dir = args.target_is_directory.target_is_directory
|| args
.dst
.as_path()
.parent()
.and_then(|dst_parent| dst_parent.join(&args.src).symlink_metadata().ok())
.is_some_and(|meta| meta.is_dir());
let res = if dir {
win_fs::symlink_dir(args.src.path, args.dst.path)
} else {
win_fs::symlink_file(args.src.path, args.dst.path)
use crate::exceptions::ToOSErrorBuilder;
use std::sync::atomic::{AtomicBool, Ordering};
use windows_sys::Win32::Storage::FileSystem::WIN32_FILE_ATTRIBUTE_DATA;
use windows_sys::Win32::Storage::FileSystem::{
CreateSymbolicLinkW, FILE_ATTRIBUTE_DIRECTORY, GetFileAttributesExW,
SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE, SYMBOLIC_LINK_FLAG_DIRECTORY,
};
res.map_err(|err| err.to_pyexception(vm))
static HAS_UNPRIVILEGED_FLAG: AtomicBool = AtomicBool::new(true);
fn check_dir(src: &OsPath, dst: &OsPath) -> bool {
use windows_sys::Win32::Storage::FileSystem::GetFileExInfoStandard;
let dst_parent = dst.as_path().parent();
let Some(dst_parent) = dst_parent else {
return false;
};
let resolved = if src.as_path().is_absolute() {
src.as_path().to_path_buf()
} else {
dst_parent.join(src.as_path())
};
let wide = match widestring::WideCString::from_os_str(&resolved) {
Ok(wide) => wide,
Err(_) => return false,
};
let mut info: WIN32_FILE_ATTRIBUTE_DATA = unsafe { std::mem::zeroed() };
let ok = unsafe {
GetFileAttributesExW(
wide.as_ptr(),
GetFileExInfoStandard,
&mut info as *mut _ as *mut _,
)
};
ok != 0 && (info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0
}
let mut flags = 0u32;
if HAS_UNPRIVILEGED_FLAG.load(Ordering::Relaxed) {
flags |= SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE;
}
if args.target_is_directory.target_is_directory || check_dir(&args.src, &args.dst) {
flags |= SYMBOLIC_LINK_FLAG_DIRECTORY;
}
let src = args.src.to_wide_cstring(vm)?;
let dst = args.dst.to_wide_cstring(vm)?;
let mut result = unsafe { CreateSymbolicLinkW(dst.as_ptr(), src.as_ptr(), flags) };
if !result
&& HAS_UNPRIVILEGED_FLAG.load(Ordering::Relaxed)
&& unsafe { Foundation::GetLastError() } == Foundation::ERROR_INVALID_PARAMETER
{
let flags = flags & !SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE;
result = unsafe { CreateSymbolicLinkW(dst.as_ptr(), src.as_ptr(), flags) };
if result
|| unsafe { Foundation::GetLastError() } != Foundation::ERROR_INVALID_PARAMETER
{
HAS_UNPRIVILEGED_FLAG.store(false, Ordering::Relaxed);
}
}
if !result {
let err = io::Error::last_os_error();
let builder = err.to_os_error_builder(vm);
let builder = builder
.filename(args.src.filename(vm))
.filename2(args.dst.filename(vm));
return Err(builder.build(vm).upcast());
}
Ok(())
}
#[pyfunction]
@@ -173,8 +235,7 @@ pub(crate) mod module {
for (key, value) in env::vars() {
// Skip hidden Windows environment variables (e.g., =C:, =D:, =ExitCode)
// These are internal cmd.exe bookkeeping variables that store per-drive
// current directories. They cannot be modified via _wputenv() and should
// not be exposed to Python code.
// current directories and cannot be reliably modified via _wputenv().
if key.starts_with('=') {
continue;
}
@@ -209,22 +270,17 @@ pub(crate) mod module {
const S_IWRITE: u32 = 128;
fn fchmod_impl(fd: i32, mode: u32, vm: &VirtualMachine) -> PyResult<()> {
fn win32_hchmod(handle: Foundation::HANDLE, mode: u32, vm: &VirtualMachine) -> PyResult<()> {
use windows_sys::Win32::Storage::FileSystem::{
FILE_BASIC_INFO, FileBasicInfo, GetFileInformationByHandleEx,
SetFileInformationByHandle,
};
// Get Windows HANDLE from fd
let borrowed = unsafe { crt_fd::Borrowed::borrow_raw(fd) };
let handle = crt_fd::as_handle(borrowed).map_err(|e| e.to_pyexception(vm))?;
let hfile = handle.as_raw_handle() as Foundation::HANDLE;
// Get current file info
let mut info: FILE_BASIC_INFO = unsafe { std::mem::zeroed() };
let ret = unsafe {
GetFileInformationByHandleEx(
hfile,
handle,
FileBasicInfo,
&mut info as *mut _ as *mut _,
std::mem::size_of::<FILE_BASIC_INFO>() as u32,
@@ -244,7 +300,7 @@ pub(crate) mod module {
// Set the new attributes
let ret = unsafe {
SetFileInformationByHandle(
hfile,
handle,
FileBasicInfo,
&info as *const _ as *const _,
std::mem::size_of::<FILE_BASIC_INFO>() as u32,
@@ -257,6 +313,36 @@ pub(crate) mod module {
Ok(())
}
fn fchmod_impl(fd: i32, mode: u32, vm: &VirtualMachine) -> PyResult<()> {
// Get Windows HANDLE from fd
let borrowed = unsafe { crt_fd::Borrowed::borrow_raw(fd) };
let handle = crt_fd::as_handle(borrowed).map_err(|e| e.to_pyexception(vm))?;
let hfile = handle.as_raw_handle() as Foundation::HANDLE;
win32_hchmod(hfile, mode, vm)
}
fn win32_lchmod(path: &OsPath, mode: u32, vm: &VirtualMachine) -> PyResult<()> {
use windows_sys::Win32::Storage::FileSystem::{GetFileAttributesW, SetFileAttributesW};
let wide = path.to_wide_cstring(vm)?;
let attr = unsafe { GetFileAttributesW(wide.as_ptr()) };
if attr == FileSystem::INVALID_FILE_ATTRIBUTES {
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path.clone(), vm));
}
let new_attr = if mode & S_IWRITE != 0 {
attr & !FileSystem::FILE_ATTRIBUTE_READONLY
} else {
attr | FileSystem::FILE_ATTRIBUTE_READONLY
};
let ret = unsafe { SetFileAttributesW(wide.as_ptr(), new_attr) };
if ret == 0 {
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path.clone(), vm));
}
Ok(())
}
#[pyfunction]
fn fchmod(fd: i32, mode: u32, vm: &VirtualMachine) -> PyResult<()> {
fchmod_impl(fd, mode, vm)
@@ -286,45 +372,36 @@ pub(crate) mod module {
unreachable!()
};
// On Windows, os.chmod behavior differs based on whether follow_symlinks is explicitly provided:
// - Not provided (default): use SetFileAttributesW on the path directly (doesn't follow symlinks)
// - Explicitly True: resolve symlink first, then apply permissions to target
// - Explicitly False: raise NotImplementedError (Windows can't change symlink permissions)
let actual_path: std::borrow::Cow<'_, std::path::Path> = match follow_symlinks.into_option()
{
None => {
// Default behavior: don't resolve symlinks, operate on path directly
std::borrow::Cow::Borrowed(path.as_ref())
}
Some(true) => {
// Explicitly follow symlinks: resolve the path first
match fs::canonicalize(&path) {
Ok(p) => std::borrow::Cow::Owned(p),
Err(_) => std::borrow::Cow::Borrowed(path.as_ref()),
}
}
Some(false) => {
// follow_symlinks=False on Windows - not supported for symlinks
// Check if path is a symlink
if let Ok(meta) = fs::symlink_metadata(&path)
&& meta.file_type().is_symlink()
{
return Err(vm.new_not_implemented_error(
"chmod: follow_symlinks=False is not supported on Windows for symlinks"
.to_owned(),
));
}
std::borrow::Cow::Borrowed(path.as_ref())
}
};
let follow_symlinks = follow_symlinks.into_option().unwrap_or(false);
// Use symlink_metadata to avoid following dangling symlinks
let meta = fs::symlink_metadata(&actual_path)
.map_err(|err| OSErrorBuilder::with_filename(&err, path.clone(), vm))?;
let mut permissions = meta.permissions();
permissions.set_readonly(mode & S_IWRITE == 0);
fs::set_permissions(&*actual_path, permissions)
.map_err(|err| OSErrorBuilder::with_filename(&err, path, vm))
if follow_symlinks {
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE,
FILE_SHARE_READ, FILE_SHARE_WRITE, FILE_WRITE_ATTRIBUTES, OPEN_EXISTING,
};
let wide = path.to_wide_cstring(vm)?;
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
core::ptr::null(),
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS,
std::ptr::null_mut(),
)
};
if handle == INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path, vm));
}
let result = win32_hchmod(handle, mode, vm);
unsafe { Foundation::CloseHandle(handle) };
result
} else {
win32_lchmod(&path, mode, vm)
}
}
/// Get the real file name (with correct case) without accessing the file.
@@ -342,10 +419,8 @@ pub(crate) mod module {
let handle = unsafe { FindFirstFileW(wide_path.as_ptr(), &mut find_data) };
if handle == INVALID_HANDLE_VALUE {
return Err(vm.new_os_error(format!(
"FindFirstFileW failed for path: {}",
path.as_ref().display()
)));
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path, vm));
}
unsafe { FindClose(handle) };
@@ -382,6 +457,8 @@ pub(crate) mod module {
const PY_IFDIR: u32 = 2; // Directory
const PY_IFLNK: u32 = 4; // Symlink
const PY_IFMNT: u32 = 8; // Mount point (junction)
const PY_IFLRP: u32 = 16; // Link Reparse Point (name-surrogate, symlink, junction)
const PY_IFRRP: u32 = 32; // Regular Reparse Point
/// _testInfo - determine file type based on attributes and reparse tag
fn _test_info(attributes: u32, reparse_tag: u32, disk_device: bool, tested_type: u32) -> bool {
@@ -406,10 +483,38 @@ pub(crate) mod module {
(attributes & FILE_ATTRIBUTE_REPARSE_POINT) != 0
&& reparse_tag == IO_REPARSE_TAG_MOUNT_POINT
}
PY_IFLRP => {
(attributes & FILE_ATTRIBUTE_REPARSE_POINT) != 0
&& is_reparse_tag_name_surrogate(reparse_tag)
}
PY_IFRRP => {
(attributes & FILE_ATTRIBUTE_REPARSE_POINT) != 0
&& reparse_tag != 0
&& !is_reparse_tag_name_surrogate(reparse_tag)
}
_ => false,
}
}
fn is_reparse_tag_name_surrogate(tag: u32) -> bool {
(tag & 0x20000000) != 0
}
fn file_info_error_is_trustworthy(error: u32) -> bool {
use windows_sys::Win32::Foundation;
matches!(
error,
Foundation::ERROR_FILE_NOT_FOUND
| Foundation::ERROR_PATH_NOT_FOUND
| Foundation::ERROR_NOT_READY
| Foundation::ERROR_BAD_NET_NAME
| Foundation::ERROR_BAD_NETPATH
| Foundation::ERROR_BAD_PATHNAME
| Foundation::ERROR_INVALID_NAME
| Foundation::ERROR_FILENAME_EXCED_RANGE
)
}
/// _testFileTypeByHandle - test file type using an open handle
fn _test_file_type_by_handle(
handle: windows_sys::Win32::Foundation::HANDLE,
@@ -467,41 +572,60 @@ pub(crate) mod module {
/// _testFileTypeByName - test file type by path name
fn _test_file_type_by_name(path: &std::path::Path, tested_type: u32) -> bool {
use crate::common::fileutils::windows::{
FILE_INFO_BY_NAME_CLASS, get_file_information_by_name,
};
use crate::common::windows::ToWideString;
use windows_sys::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OPEN_REPARSE_POINT,
FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE,
OPEN_EXISTING,
CreateFileW, FILE_ATTRIBUTE_REPARSE_POINT, FILE_FLAG_BACKUP_SEMANTICS,
FILE_FLAG_OPEN_REPARSE_POINT, FILE_READ_ATTRIBUTES, OPEN_EXISTING,
};
use windows_sys::Win32::Storage::FileSystem::{FILE_DEVICE_CD_ROM, FILE_DEVICE_DISK};
use windows_sys::Win32::System::Ioctl::FILE_DEVICE_VIRTUAL_DISK;
// For islink/isjunction, use symlink_metadata to check reparse points
if (tested_type == PY_IFLNK || tested_type == PY_IFMNT)
&& let Ok(meta) = path.symlink_metadata()
{
use std::os::windows::fs::MetadataExt;
let attrs = meta.file_attributes();
use windows_sys::Win32::Storage::FileSystem::FILE_ATTRIBUTE_REPARSE_POINT;
if (attrs & FILE_ATTRIBUTE_REPARSE_POINT) == 0 {
return false;
match get_file_information_by_name(
path.as_os_str(),
FILE_INFO_BY_NAME_CLASS::FileStatBasicByNameInfo,
) {
Ok(info) => {
let disk_device = matches!(
info.DeviceType,
FILE_DEVICE_DISK | FILE_DEVICE_VIRTUAL_DISK | FILE_DEVICE_CD_ROM
);
let result = _test_info(
info.FileAttributes,
info.ReparseTag,
disk_device,
tested_type,
);
if !result
|| (tested_type != PY_IFREG && tested_type != PY_IFDIR)
|| (info.FileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) == 0
{
return result;
}
}
Err(err) => {
if let Some(code) = err.raw_os_error()
&& file_info_error_is_trustworthy(code as u32)
{
return false;
}
}
// Need to check reparse tag, fall through to CreateFileW
}
let wide_path = path.to_wide_with_nul();
// For symlinks/junctions, add FILE_FLAG_OPEN_REPARSE_POINT to not follow
let mut flags = FILE_FLAG_BACKUP_SEMANTICS;
if tested_type != PY_IFREG && tested_type != PY_IFDIR {
flags |= FILE_FLAG_OPEN_REPARSE_POINT;
}
// Use sharing flags to avoid access denied errors
let handle = unsafe {
CreateFileW(
wide_path.as_ptr(),
FILE_READ_ATTRIBUTES,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
0,
core::ptr::null(),
OPEN_EXISTING,
flags,
@@ -509,98 +633,122 @@ pub(crate) mod module {
)
};
if handle == INVALID_HANDLE_VALUE {
// Fallback: try using Rust's metadata for isdir/isfile
if tested_type == PY_IFDIR {
return path.metadata().is_ok_and(|m| m.is_dir());
} else if tested_type == PY_IFREG {
return path.metadata().is_ok_and(|m| m.is_file());
}
// For symlinks/junctions, try without FILE_FLAG_BACKUP_SEMANTICS
let handle = unsafe {
CreateFileW(
wide_path.as_ptr(),
FILE_READ_ATTRIBUTES,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
core::ptr::null(),
OPEN_EXISTING,
FILE_FLAG_OPEN_REPARSE_POINT,
std::ptr::null_mut(),
)
};
if handle == INVALID_HANDLE_VALUE {
return false;
}
let result = _test_file_type_by_handle(handle, tested_type, true);
if handle != INVALID_HANDLE_VALUE {
let result = _test_file_type_by_handle(handle, tested_type, false);
unsafe { CloseHandle(handle) };
return result;
}
let result = _test_file_type_by_handle(handle, tested_type, true);
unsafe { CloseHandle(handle) };
result
match unsafe { windows_sys::Win32::Foundation::GetLastError() } {
windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED
| windows_sys::Win32::Foundation::ERROR_SHARING_VIOLATION
| windows_sys::Win32::Foundation::ERROR_CANT_ACCESS_FILE
| windows_sys::Win32::Foundation::ERROR_INVALID_PARAMETER => {
let stat = if tested_type == PY_IFREG || tested_type == PY_IFDIR {
crate::windows::win32_xstat(path.as_os_str(), true)
} else {
crate::windows::win32_xstat(path.as_os_str(), false)
};
if let Ok(st) = stat {
let disk_device = (st.st_mode & libc::S_IFREG as u16) != 0;
return _test_info(
st.st_file_attributes,
st.st_reparse_tag,
disk_device,
tested_type,
);
}
}
_ => {}
}
false
}
/// _testFileExistsByName - test if path exists
fn _test_file_exists_by_name(path: &std::path::Path, follow_links: bool) -> bool {
use crate::common::fileutils::windows::{
FILE_INFO_BY_NAME_CLASS, get_file_information_by_name,
};
use crate::common::windows::ToWideString;
use windows_sys::Win32::Foundation::{CloseHandle, GENERIC_READ, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OPEN_REPARSE_POINT,
FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE,
OPEN_EXISTING,
CreateFileW, FILE_ATTRIBUTE_REPARSE_POINT, FILE_FLAG_BACKUP_SEMANTICS,
FILE_FLAG_OPEN_REPARSE_POINT, FILE_READ_ATTRIBUTES, OPEN_EXISTING,
};
// First try standard Rust exists/symlink_metadata (handles \\?\ paths well)
if follow_links {
if path.exists() {
return true;
match get_file_information_by_name(
path.as_os_str(),
FILE_INFO_BY_NAME_CLASS::FileStatBasicByNameInfo,
) {
Ok(info) => {
if (info.FileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) == 0
|| (!follow_links && is_reparse_tag_name_surrogate(info.ReparseTag))
{
return true;
}
}
Err(err) => {
if let Some(code) = err.raw_os_error()
&& file_info_error_is_trustworthy(code as u32)
{
return false;
}
}
} else if path.symlink_metadata().is_ok() {
return true;
}
let wide_path = path.to_wide_with_nul();
let mut flags = FILE_FLAG_BACKUP_SEMANTICS;
if !follow_links {
flags |= FILE_FLAG_OPEN_REPARSE_POINT;
}
// Fallback: try with FILE_READ_ATTRIBUTES and sharing flags
let handle = unsafe {
CreateFileW(
wide_path.as_ptr(),
FILE_READ_ATTRIBUTES,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
0,
core::ptr::null(),
OPEN_EXISTING,
flags,
std::ptr::null_mut(),
)
};
if handle != INVALID_HANDLE_VALUE {
if follow_links {
unsafe { CloseHandle(handle) };
return true;
}
let is_regular_reparse_point = _test_file_type_by_handle(handle, PY_IFRRP, false);
unsafe { CloseHandle(handle) };
return true;
if !is_regular_reparse_point {
return true;
}
let handle = unsafe {
CreateFileW(
wide_path.as_ptr(),
FILE_READ_ATTRIBUTES,
0,
core::ptr::null(),
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS,
std::ptr::null_mut(),
)
};
if handle != INVALID_HANDLE_VALUE {
unsafe { CloseHandle(handle) };
return true;
}
}
// Fallback for console devices like \\.\CON
let handle = unsafe {
CreateFileW(
wide_path.as_ptr(),
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
core::ptr::null(),
OPEN_EXISTING,
0,
std::ptr::null_mut(),
)
};
if handle != INVALID_HANDLE_VALUE {
unsafe { CloseHandle(handle) };
return true;
match unsafe { windows_sys::Win32::Foundation::GetLastError() } {
windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED
| windows_sys::Win32::Foundation::ERROR_SHARING_VIOLATION
| windows_sys::Win32::Foundation::ERROR_CANT_ACCESS_FILE
| windows_sys::Win32::Foundation::ERROR_INVALID_PARAMETER => {
let stat = crate::windows::win32_xstat(path.as_os_str(), follow_links);
return stat.is_ok();
}
_ => {}
}
false
@@ -988,9 +1136,10 @@ pub(crate) mod module {
let key_str = key.to_string_lossy();
let value_str = value.to_string_lossy();
// Validate: no '=' in key (search from index 1 because on Windows
// starting '=' is allowed for defining hidden environment variables)
if key_str.get(1..).is_some_and(|s| s.contains('=')) {
// Validate: empty key or '=' in key after position 0
// (search from index 1 because on Windows starting '=' is allowed
// for defining hidden environment variables)
if key_str.is_empty() || key_str.get(1..).is_some_and(|s| s.contains('=')) {
return Err(vm.new_value_error("illegal environment variable name"));
}
@@ -1108,9 +1257,10 @@ pub(crate) mod module {
if key_str.contains('\0') || value_str.contains('\0') {
return Err(vm.new_value_error("embedded null character"));
}
// Validate: no '=' in key (search from index 1 because on Windows
// starting '=' is allowed for defining hidden environment variables)
if key_str.get(1..).is_some_and(|s| s.contains('=')) {
// Validate: empty key or '=' in key after position 0
// (search from index 1 because on Windows starting '=' is allowed
// for defining hidden environment variables)
if key_str.is_empty() || key_str.get(1..).is_some_and(|s| s.contains('=')) {
return Err(vm.new_value_error("illegal environment variable name"));
}
@@ -1135,11 +1285,52 @@ pub(crate) mod module {
#[pyfunction]
fn _getfinalpathname(path: OsPath, vm: &VirtualMachine) -> PyResult {
let real = path
.as_ref()
.canonicalize()
.map_err(|e| e.to_pyexception(vm))?;
Ok(path.mode().process_path(real, vm))
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, GetFinalPathNameByHandleW, OPEN_EXISTING,
VOLUME_NAME_DOS,
};
let wide = path.to_wide_cstring(vm)?;
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
0,
0,
core::ptr::null(),
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS,
std::ptr::null_mut(),
)
};
if handle == INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path, vm));
}
let mut buffer: Vec<u16> = vec![0; Foundation::MAX_PATH as usize];
let result = loop {
let ret = unsafe {
GetFinalPathNameByHandleW(
handle,
buffer.as_mut_ptr(),
buffer.len() as u32,
VOLUME_NAME_DOS,
)
};
if ret == 0 {
let err = io::Error::last_os_error();
let _ = unsafe { Foundation::CloseHandle(handle) };
return Err(OSErrorBuilder::with_filename(&err, path, vm));
}
if (ret as usize) < buffer.len() {
let final_path = std::ffi::OsString::from_wide(&buffer[..ret as usize]);
break Ok(path.mode().process_path(final_path, vm));
}
buffer.resize(ret as usize, 0);
};
unsafe { Foundation::CloseHandle(handle) };
result
}
#[pyfunction]
@@ -1155,7 +1346,8 @@ pub(crate) mod module {
)
};
if ret == 0 {
return Err(vm.new_last_os_error());
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path.clone(), vm));
}
if ret as usize > buffer.len() {
buffer.resize(ret as usize, 0);
@@ -1168,7 +1360,8 @@ pub(crate) mod module {
)
};
if ret == 0 {
return Err(vm.new_last_os_error());
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path.clone(), vm));
}
}
let buffer = widestring::WideCString::from_vec_truncate(buffer);
@@ -1179,12 +1372,16 @@ pub(crate) mod module {
fn _getvolumepathname(path: OsPath, vm: &VirtualMachine) -> PyResult {
let wide = path.to_wide_cstring(vm)?;
let buflen = std::cmp::max(wide.len(), Foundation::MAX_PATH as usize);
if buflen > u32::MAX as usize {
return Err(vm.new_overflow_error("path too long".to_owned()));
}
let mut buffer = vec![0u16; buflen];
let ret = unsafe {
FileSystem::GetVolumePathNameW(wide.as_ptr(), buffer.as_mut_ptr(), buflen as _)
};
if ret == 0 {
return Err(vm.new_last_os_error());
let err = io::Error::last_os_error();
return Err(OSErrorBuilder::with_filename(&err, path, vm));
}
let buffer = widestring::WideCString::from_vec_truncate(buffer);
Ok(path.mode().process_path(buffer.to_os_string(), vm))
@@ -1360,8 +1557,7 @@ pub(crate) mod module {
let hr = unsafe {
windows_sys::Win32::UI::Shell::PathCchSkipRoot(backslashed.as_ptr(), &mut end)
};
if hr == 0 {
// S_OK
if hr >= 0 {
assert!(!end.is_null());
let len: usize = unsafe { end.offset_from(backslashed.as_ptr()) }
.try_into()
@@ -1373,15 +1569,186 @@ pub(crate) mod module {
len,
backslashed.len()
);
(
Wtf8Buf::from_wide(&orig[..len]),
Wtf8Buf::from_wide(&orig[len..]),
)
if len != 0 {
(
Wtf8Buf::from_wide(&orig[..len]),
Wtf8Buf::from_wide(&orig[len..]),
)
} else {
(Wtf8Buf::from_wide(&orig), Wtf8Buf::new())
}
} else {
(Wtf8Buf::new(), Wtf8Buf::from_wide(&orig))
}
}
/// Normalize a wide-char path (faithful port of _Py_normpath_and_size).
/// Uses lastC tracking like the C implementation.
fn normpath_wide(path: &[u16]) -> Vec<u16> {
if path.is_empty() {
return vec![b'.' as u16];
}
const SEP: u16 = b'\\' as u16;
const ALTSEP: u16 = b'/' as u16;
const DOT: u16 = b'.' as u16;
let is_sep = |c: u16| c == SEP || c == ALTSEP;
let sep_or_end = |input: &[u16], idx: usize| idx >= input.len() || is_sep(input[idx]);
// Work on a mutable copy with normalized separators
let mut buf: Vec<u16> = path
.iter()
.map(|&c| if c == ALTSEP { SEP } else { c })
.collect();
let (drv_size, root_size) = skiproot(&buf);
let prefix_len = drv_size + root_size;
// p1 = read cursor, p2 = write cursor
let mut p1 = prefix_len;
let mut p2 = prefix_len;
let mut min_p2 = if prefix_len > 0 { prefix_len } else { 0 };
let mut last_c: u16 = if prefix_len > 0 {
min_p2 = prefix_len - 1;
let c = buf[min_p2];
// On Windows, if last char of prefix is not SEP, advance min_p2
if c != SEP {
min_p2 = prefix_len;
}
c
} else {
0
};
// Skip leading ".\" after prefix
if p1 < buf.len() && buf[p1] == DOT && sep_or_end(&buf, p1 + 1) {
p1 += 1;
last_c = SEP; // treat as if we consumed a separator
while p1 < buf.len() && buf[p1] == SEP {
p1 += 1;
}
}
while p1 < buf.len() {
let c = buf[p1];
if last_c == SEP {
if c == DOT {
let sep_at_1 = sep_or_end(&buf, p1 + 1);
let sep_at_2 = !sep_at_1 && sep_or_end(&buf, p1 + 2);
if sep_at_2 && buf[p1 + 1] == DOT {
// ".." component
let mut p3 = p2;
while p3 != min_p2 && buf[p3 - 1] == SEP {
p3 -= 1;
}
while p3 != min_p2 && buf[p3 - 1] != SEP {
p3 -= 1;
}
if p2 == min_p2
|| (buf[p3] == DOT
&& p3 + 1 < buf.len()
&& buf[p3 + 1] == DOT
&& (p3 + 2 >= buf.len() || buf[p3 + 2] == SEP))
{
// Previous segment is also ../ or at minimum
buf[p2] = DOT;
p2 += 1;
buf[p2] = DOT;
p2 += 1;
last_c = DOT;
} else if buf[p3] == SEP {
// Absolute path - absorb segment
p2 = p3 + 1;
// last_c stays SEP
} else {
p2 = p3;
// last_c stays SEP
}
p1 += 1; // skip second dot (first dot is current p1)
} else if sep_at_1 {
// "." component - skip
} else {
buf[p2] = c;
p2 += 1;
last_c = c;
}
} else if c == SEP {
// Collapse multiple separators - skip
} else {
buf[p2] = c;
p2 += 1;
last_c = c;
}
} else {
buf[p2] = c;
p2 += 1;
last_c = c;
}
p1 += 1;
}
// Null-terminate style: trim trailing separators
if p2 != min_p2 {
while p2 > min_p2 + 1 && buf[p2 - 1] == SEP {
p2 -= 1;
}
}
buf.truncate(p2);
if buf.is_empty() { vec![DOT] } else { buf }
}
#[pyfunction]
fn _path_normpath(path: crate::PyObjectRef, vm: &VirtualMachine) -> PyResult {
use crate::builtins::{PyBytes, PyStr};
use rustpython_common::wtf8::Wtf8Buf;
// Handle path-like objects via os.fspath
let path = if let Some(fspath) = vm.get_method(path.clone(), identifier!(vm, __fspath__)) {
fspath?.call((), vm)?
} else {
path
};
let (wide, is_bytes): (Vec<u16>, bool) = if let Some(s) = path.downcast_ref::<PyStr>() {
let wide: Vec<u16> = s.as_wtf8().encode_wide().collect();
(wide, false)
} else if let Some(b) = path.downcast_ref::<PyBytes>() {
let s = std::str::from_utf8(b.as_bytes()).map_err(|e| {
vm.new_exception_msg(
vm.ctx.exceptions.unicode_decode_error.to_owned(),
format!(
"'utf-8' codec can't decode byte {:#x} in position {}: invalid start byte",
b.as_bytes().get(e.valid_up_to()).copied().unwrap_or(0),
e.valid_up_to()
),
)
})?;
let wide: Vec<u16> = s.encode_utf16().collect();
(wide, true)
} else {
return Err(vm.new_type_error(format!(
"expected str or bytes, not {}",
path.class().name()
)));
};
let normalized = normpath_wide(&wide);
if is_bytes {
let s = String::from_utf16(&normalized)
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
Ok(vm.ctx.new_bytes(s.into_bytes()).into())
} else {
let s = Wtf8Buf::from_wide(&normalized);
Ok(vm.ctx.new_str(s).into())
}
}
#[pyfunction]
fn _getdiskusage(path: OsPath, vm: &VirtualMachine) -> PyResult<(u64, u64)> {
use FileSystem::GetDiskFreeSpaceExW;
@@ -1659,15 +2026,22 @@ pub(crate) mod module {
#[pyfunction]
fn pipe(vm: &VirtualMachine) -> PyResult<(i32, i32)> {
use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
use windows_sys::Win32::System::Pipes::CreatePipe;
let mut attr = SECURITY_ATTRIBUTES {
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
lpSecurityDescriptor: core::ptr::null_mut(),
bInheritHandle: 0,
};
let (read_handle, write_handle) = unsafe {
let mut read = MaybeUninit::<isize>::uninit();
let mut write = MaybeUninit::<isize>::uninit();
let res = CreatePipe(
read.as_mut_ptr() as *mut _,
write.as_mut_ptr() as *mut _,
core::ptr::null(),
&mut attr as *mut _,
0,
);
if res == 0 {
@@ -1885,10 +2259,7 @@ pub(crate) mod module {
// PathBuffer starts at offset 16
(sub_offset, sub_length, 16usize)
} else {
// Unknown reparse tag - fall back to std::fs::read_link
let link_path = fs::read_link(path.as_ref())
.map_err(|e| crate::convert::ToPyException::to_pyexception(&e, vm))?;
return Ok(mode.process_path(link_path, vm));
return Err(vm.new_value_error("not a symbolic link".to_owned()));
};
// Extract the substitute name
@@ -1906,17 +2277,20 @@ pub(crate) mod module {
.map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
.collect();
let mut result_path = std::ffi::OsString::from_wide(&wide_chars);
let mut wide_chars = wide_chars;
// For mount points (junctions), the substitute name typically starts with \??\
// Convert this to \\?\
let result_str = result_path.to_string_lossy();
if let Some(stripped) = result_str.strip_prefix(r"\??\") {
// Replace \??\ with \\?\
let new_path = format!(r"\\?\{}", stripped);
result_path = std::ffi::OsString::from(new_path);
if wide_chars.len() > 4
&& wide_chars[0] == b'\\' as u16
&& wide_chars[1] == b'?' as u16
&& wide_chars[2] == b'?' as u16
&& wide_chars[3] == b'\\' as u16
{
wide_chars[1] = b'\\' as u16;
}
let result_path = std::ffi::OsString::from_wide(&wide_chars);
Ok(mode.process_path(std::path::PathBuf::from(result_path), vm))
}

View File

@@ -1098,7 +1098,7 @@ pub mod module {
OsPath::try_from_object(vm, v)?.into_bytes(),
);
if memchr::memchr(b'=', &key).is_some() {
if key.is_empty() || memchr::memchr(b'=', &key).is_some() {
return Err(vm.new_value_error("illegal environment variable name"));
}