Update bz2.py from 3.13.5 (#6055)

This commit is contained in:
Shahar Naveh
2025-08-01 15:16:32 +02:00
committed by GitHub
parent 2d65c7f859
commit 710941c27f
3 changed files with 211 additions and 14 deletions

18
Lib/bz2.py vendored
View File

@@ -17,7 +17,7 @@ import _compression
from _bz2 import BZ2Compressor, BZ2Decompressor
_MODE_CLOSED = 0
# Value 0 no longer used
_MODE_READ = 1
# Value 2 no longer used
_MODE_WRITE = 3
@@ -54,7 +54,7 @@ class BZ2File(_compression.BaseStream):
"""
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._mode = None
if not (1 <= compresslevel <= 9):
raise ValueError("compresslevel must be between 1 and 9")
@@ -100,7 +100,7 @@ class BZ2File(_compression.BaseStream):
May be called more than once without error. Once the file is
closed, any other operation on it will raise a ValueError.
"""
if self._mode == _MODE_CLOSED:
if self.closed:
return
try:
if self._mode == _MODE_READ:
@@ -115,13 +115,21 @@ class BZ2File(_compression.BaseStream):
finally:
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._buffer = None
@property
def closed(self):
"""True if this file is closed."""
return self._mode == _MODE_CLOSED
return self._fp is None
@property
def name(self):
self._check_not_closed()
return self._fp.name
@property
def mode(self):
return 'wb' if self._mode == _MODE_WRITE else 'rb'
def fileno(self):
"""Return the file descriptor for the underlying file."""

2
Lib/lzma.py vendored
View File

@@ -128,7 +128,7 @@ class LZMAFile(_compression.BaseStream):
if self._mode == _MODE_READ:
raw = _compression.DecompressReader(self._fp, LZMADecompressor,
trailing_error=LZMAError, format=format, filters=filters)
trailing_error=LZMAError, format=format, filters=filters)
self._buffer = io.BufferedReader(raw)
def close(self):

205
Lib/test/test_bz2.py vendored
View File

@@ -3,19 +3,19 @@ from test.support import bigmemtest, _4G
import array
import unittest
import io
from io import BytesIO, DEFAULT_BUFFER_SIZE
import os
import pickle
import glob
import tempfile
import pathlib
import random
import shutil
import subprocess
import threading
from test.support import import_helper
from test.support import threading_helper
from test.support.os_helper import unlink
from test.support.os_helper import unlink, FakePath
import _compression
import sys
@@ -476,7 +476,6 @@ class BZ2FileTest(BaseTest):
self.assertEqual(xlines, [b'Test'])
def testContextProtocol(self):
f = None
with BZ2File(self.filename, "wb") as f:
f.write(b"xxx")
f = BZ2File(self.filename, "rb")
@@ -537,26 +536,210 @@ class BZ2FileTest(BaseTest):
with BZ2File(self.filename) as bz2f:
self.assertEqual(bz2f.read(), data1 + data2)
def testOpenFilename(self):
with BZ2File(self.filename, "wb") as f:
f.write(b'content')
self.assertEqual(f.name, self.filename)
self.assertIsInstance(f.fileno(), int)
self.assertEqual(f.mode, 'wb')
self.assertIs(f.readable(), False)
self.assertIs(f.writable(), True)
self.assertIs(f.seekable(), False)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'wb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
with BZ2File(self.filename, "ab") as f:
f.write(b'appendix')
self.assertEqual(f.name, self.filename)
self.assertIsInstance(f.fileno(), int)
self.assertEqual(f.mode, 'wb')
self.assertIs(f.readable(), False)
self.assertIs(f.writable(), True)
self.assertIs(f.seekable(), False)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'wb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
with BZ2File(self.filename, 'rb') as f:
self.assertEqual(f.read(), b'contentappendix')
self.assertEqual(f.name, self.filename)
self.assertIsInstance(f.fileno(), int)
self.assertEqual(f.mode, 'rb')
self.assertIs(f.readable(), True)
self.assertIs(f.writable(), False)
self.assertIs(f.seekable(), True)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'rb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
def testOpenFileWithName(self):
with open(self.filename, 'wb') as raw:
with BZ2File(raw, 'wb') as f:
f.write(b'content')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'wb')
self.assertIs(f.readable(), False)
self.assertIs(f.writable(), True)
self.assertIs(f.seekable(), False)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'wb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
with open(self.filename, 'ab') as raw:
with BZ2File(raw, 'ab') as f:
f.write(b'appendix')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'wb')
self.assertIs(f.readable(), False)
self.assertIs(f.writable(), True)
self.assertIs(f.seekable(), False)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'wb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
with open(self.filename, 'rb') as raw:
with BZ2File(raw, 'rb') as f:
self.assertEqual(f.read(), b'contentappendix')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'rb')
self.assertIs(f.readable(), True)
self.assertIs(f.writable(), False)
self.assertIs(f.seekable(), True)
self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
self.assertEqual(f.mode, 'rb')
self.assertRaises(ValueError, f.readable)
self.assertRaises(ValueError, f.writable)
self.assertRaises(ValueError, f.seekable)
def testOpenFileWithoutName(self):
bio = BytesIO()
with BZ2File(bio, 'wb') as f:
f.write(b'content')
with self.assertRaises(AttributeError):
f.name
self.assertRaises(io.UnsupportedOperation, f.fileno)
self.assertEqual(f.mode, 'wb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
with BZ2File(bio, 'ab') as f:
f.write(b'appendix')
with self.assertRaises(AttributeError):
f.name
self.assertRaises(io.UnsupportedOperation, f.fileno)
self.assertEqual(f.mode, 'wb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
bio.seek(0)
with BZ2File(bio, 'rb') as f:
self.assertEqual(f.read(), b'contentappendix')
with self.assertRaises(AttributeError):
f.name
self.assertRaises(io.UnsupportedOperation, f.fileno)
self.assertEqual(f.mode, 'rb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
def testOpenFileWithIntName(self):
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
with open(fd, 'wb') as raw:
with BZ2File(raw, 'wb') as f:
f.write(b'content')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'wb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
with open(fd, 'ab') as raw:
with BZ2File(raw, 'ab') as f:
f.write(b'appendix')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'wb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
fd = os.open(self.filename, os.O_RDONLY)
with open(fd, 'rb') as raw:
with BZ2File(raw, 'rb') as f:
self.assertEqual(f.read(), b'contentappendix')
self.assertEqual(f.name, raw.name)
self.assertEqual(f.fileno(), raw.fileno())
self.assertEqual(f.mode, 'rb')
with self.assertRaises(ValueError):
f.name
self.assertRaises(ValueError, f.fileno)
def testOpenBytesFilename(self):
str_filename = self.filename
try:
bytes_filename = str_filename.encode("ascii")
except UnicodeEncodeError:
self.skipTest("Temporary file name needs to be ASCII")
bytes_filename = os.fsencode(str_filename)
with BZ2File(bytes_filename, "wb") as f:
f.write(self.DATA)
self.assertEqual(f.name, bytes_filename)
with BZ2File(bytes_filename, "rb") as f:
self.assertEqual(f.read(), self.DATA)
self.assertEqual(f.name, bytes_filename)
# Sanity check that we are actually operating on the right file.
with BZ2File(str_filename, "rb") as f:
self.assertEqual(f.read(), self.DATA)
self.assertEqual(f.name, str_filename)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def testOpenPathLikeFilename(self):
filename = pathlib.Path(self.filename)
filename = FakePath(self.filename)
with BZ2File(filename, "wb") as f:
f.write(self.DATA)
self.assertEqual(f.name, self.filename)
with BZ2File(filename, "rb") as f:
self.assertEqual(f.read(), self.DATA)
self.assertEqual(f.name, self.filename)
def testDecompressLimited(self):
"""Decompressed data buffering should be limited"""
@@ -577,6 +760,9 @@ class BZ2FileTest(BaseTest):
with BZ2File(bio) as bz2f:
self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(), self.TEXT)
with self.assertRaises(AttributeError):
bz2.name
self.assertEqual(bz2f.mode, 'rb')
self.assertFalse(bio.closed)
def testPeekBytesIO(self):
@@ -592,6 +778,9 @@ class BZ2FileTest(BaseTest):
with BZ2File(bio, "w") as bz2f:
self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT)
with self.assertRaises(AttributeError):
bz2.name
self.assertEqual(bz2f.mode, 'wb')
self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
self.assertFalse(bio.closed)