Compare commits

...

18 Commits

Author SHA1 Message Date
f1863cc40f change ci 2025-03-15 16:54:31 +09:00
5f4ab448cb Update .github/workflows/ai-review.yml 2025-03-15 16:53:22 +09:00
8a646957eb Update .github/workflows/ai-review.yml 2025-03-15 16:53:22 +09:00
9aa703e07d Update .github/workflows/ai-review.yml 2025-03-15 16:53:22 +09:00
287cfea18a Add ci 2025-03-15 16:53:22 +09:00
Jeong, YunWon
bd94d8d50c Remove uu.py and test_uu.py (#5607)
* Remove uu.py and test_uu.py

* patch email.message
2025-03-14 11:42:26 +09:00
Ashwin Naren
7fab64ed9c Revert "Update statistics to 3.13.2 (#5592)" (#5606)
This reverts commit ff970b0e1c.
2025-03-14 11:41:14 +09:00
Ashwin Naren
8e22c399df partially fix sys.getwindowsversion() (#5595) 2025-03-14 11:38:35 +09:00
Ashwin Naren
7546ea91a9 patch email.message 2025-03-13 10:16:51 -07:00
Ashwin Naren
8da66978bf Remove uu.py and test_uu.py 2025-03-13 09:49:10 -07:00
Ashwin Naren
8484bfa2e0 Remove cgi module (#5597)
* no cgi

* mark failing test
2025-03-12 09:47:34 +09:00
Ashwin Naren
ff970b0e1c Update statistics to 3.13.2 (#5592)
* statistics to 3.13.2

* set flaky test
2025-03-11 22:37:26 +09:00
Jeong, YunWon
8be7e4327d Merge pull request #5596 from arihant2math/osx-support-313
_osx_support update to 3.13.2
2025-03-11 15:58:19 +09:00
Jeong, YunWon
82eeb237dc Merge pull request #5598 from arihant2math/fix-whats-left-again
Fixed whats left
2025-03-11 15:57:57 +09:00
Ashwin Naren
cbbadf562f Fixed whats left 2025-03-10 23:27:05 -07:00
Ashwin Naren
4308321f39 _osx_support update to 3.13 2025-03-10 23:13:32 -07:00
Jeong, YunWon
985eebf9b0 Merge pull request #5589 from youknowone/pyattr-const
Replace pyattr(once) to constant
2025-03-11 10:12:50 +09:00
Jeong YunWon
87fae150da Replace pyattr(once) to constant 2025-03-09 12:39:12 +09:00
15 changed files with 748 additions and 2323 deletions

304
.github/scripts/code_review.py vendored Normal file
View File

@@ -0,0 +1,304 @@
"""Code Reviewer for Gitea."""
import asyncio
import fnmatch
import json
import os
import re
from typing import Any
import requests
import aiohttp
from model import Model
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "")
HEADERS = {"Authorization": f"token {ACCESS_TOKEN}"}
GITHUB_EVENT_PATH = os.getenv("GITHUB_EVENT_PATH")
try:
with open(GITHUB_EVENT_PATH, "r") as f:
EVENT_DATA = json.load(f)
except FileNotFoundError:
print("Failed to load event data.")
exit(1)
FULL_CONTEXT_MODEL_NAME = os.getenv("FULL_CONTEXT_MODEL", "")
SINGLE_CHUNK_MODEL_NAME = os.getenv("SINGLE_CHUNK_MODEL", "")
FULL_CONTEXT_API_KEY = os.getenv("FULL_CONTEXT_API_KEY", "")
SINGLE_CHUNK_API_KEY = os.getenv("SINGLE_CHUNK_API_KEY", "")
EXCLUDE_PATTERNS = os.getenv("EXCLUDE", "").split(",")
def get_diff() -> str | None:
"""Get code difference between base and head from Gitea.
Returns:
str | None: code difference between base and head, or None if failed to get diff
"""
url = EVENT_DATA["pull_request"]["diff_url"]
try:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"Failed to get diff: {e}")
return None
def parse_diff(diff: str) -> list[dict[str, Any]]:
"""Parse diff into list of dicts.
Args:
diff: str, code difference between base and head
Returns:
list[dict[str, Any]]: list of dicts, each dict represents a code chunks
"""
file_pattern = re.compile(
r"(?s)diff --git a/(.+?) b/(.*?)\r?\n(.*?)(?=diff --git a/|$)", re.S
)
old_new_pattern = re.compile(r"(?m)^(---|\+\+\+)\s+(.*)$")
chunk_range_pattern = re.compile(
r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*?)?(?=@@|\Z)",
re.MULTILINE | re.DOTALL,
)
list_diff = []
for match in file_pattern.finditer(diff):
diff_text = match.group(3)
old_new_match = list(old_new_pattern.finditer(diff_text))
if len(old_new_match) != 2:
continue
old_file = old_new_match[0].group(2)
old_file = old_file.lstrip("a/") if old_file.startswith("a/") else old_file
new_file = old_new_match[1].group(2)
if new_file == "/dev/null":
print("Neglict deleted file")
continue
new_file = new_file.lstrip("b/")
if any(fnmatch.fnmatch(new_file, pattern) for pattern in EXCLUDE_PATTERNS):
print(f"Exclude file {new_file}")
continue
output_diff_text = []
for chunk_range_match in chunk_range_pattern.finditer(diff_text):
old_idx = int(chunk_range_match.group(1))
new_idx = int(chunk_range_match.group(3))
for line in chunk_range_match.group(5).splitlines():
if line.startswith("-"):
output_diff_text.append(f"{old_idx} None {line}")
old_idx += 1
elif line.startswith("+"):
output_diff_text.append(f"None {new_idx} {line}")
new_idx += 1
else:
output_diff_text.append(f"{old_idx} {new_idx} {line}")
old_idx += 1
new_idx += 1
output_diff_text = "\n".join(output_diff_text)
list_diff.append(
{
"file": new_file,
"chunk": output_diff_text,
}
)
return list_diff
def create_comment(
file: str, ai_response: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Create comments for single chunk review.
Args:
file: str, file name
ai_response: list[dict[str, Any]], AI response for single chunk review
Returns:
list[dict[str, Any]]: comments for single chunk review
"""
comments = []
for ai_response in ai_response:
comments.append(
{
"body": f"[REVIEW] {ai_response['reviewComment']}",
"path": file,
"new_position": int(ai_response["lineNumber"]),
}
)
return comments
async def analyze_single_chunks(
single_chunk_model: Model, parsed_diff: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Analyze single chunks and create comments.
Args:
single_chunk_model: AI Session for single chunk analysis
parsed_diff: list[dict[str, Any]], parsed diff
Returns:
list[dict[str, Any]]: comments for single chunk review
"""
async def process_single_chunk(diff: dict[str, Any]):
file = diff["file"]
chunk = diff["chunk"]
response = await single_chunk_model.get_response_single_chunk(
file, title, description, chunk
)
response = response.strip("`").lstrip("json").strip() or "[]"
try:
response_json = json.loads(response)
return create_comment(file, response_json)
except json.JSONDecodeError:
print(f"Failed to parse response: {response}")
return []
title = EVENT_DATA["pull_request"]["title"]
description = EVENT_DATA["pull_request"]["body"]
tasks = [process_single_chunk(diff) for diff in parsed_diff]
results = await asyncio.gather(*tasks)
# Flatten the list of comments
comments = [comment for result in results for comment in result]
return comments
async def get_file_content(file: str) -> str | None:
"""Get file content from Gitea.
Args:
file: str, file name
Returns:
str | None: file content, or None if failed to get file content
"""
repo_url = EVENT_DATA["pull_request"]["head"]["repo"]["url"]
branch = EVENT_DATA["pull_request"]["head"]["ref"]
replaced_file = file.replace("/", "%2F")
url = f"{repo_url}/raw/{branch}%2F{replaced_file}?ref={branch}"
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e: # More specific exception handling
print(f"Network error fetching {file}: {e}")
except asyncio.TimeoutError:
print(f"Timeout fetching {file}")
return None
async def analyze_full_context(
full_context_model: Model, parsed_diff: list[dict[str, Any]]
) -> str:
"""Analyze full context and create review.
Args:
full_context_model: AI Session for full context analysis
parsed_diff: list[dict[str, Any]], parsed diff
Returns:
str: review for full context
"""
async def get_file_data(diff: dict[str, Any]):
file = diff["file"]
chunk = diff["chunk"]
content = get_file_content(file)
if content is None:
return None
return f"File: {file}\n{content}\nDiff: {chunk}"
tasks = [get_file_data(diff) for diff in parsed_diff]
file_contents_list = await asyncio.gather(*tasks)
file_contents = [item for item in file_contents_list if item is not None]
if not file_contents:
return ""
title = EVENT_DATA["pull_request"]["title"]
description = EVENT_DATA["pull_request"]["body"]
response = await full_context_model.get_response_full_context(
title, description, file_contents
)
response = response.strip("`").lstrip("markdown").strip()
return response
def post_review(
full_context_review: str, single_chunk_comments: list[dict[str, Any]]
) -> None:
"""Post review to Gitea.
Args:
full_context_review: str, review for full context
single_chunk_comments: list[dict[str, Any]], comments for single chunk review
"""
repo_url = EVENT_DATA["pull_request"]["head"]["repo"]["url"]
pull_number = EVENT_DATA["number"]
commit_id = EVENT_DATA["pull_request"]["head"]["sha"]
url = f"{repo_url}/pulls/{pull_number}/reviews"
data = {
"body": full_context_review,
"event": "COMMENT",
"comments": single_chunk_comments,
"commit_id": commit_id,
}
response = requests.post(url, headers=HEADERS, json=data)
response.raise_for_status()
async def main() -> None:
"""Code Reviewer for Gitea: Asynchronous version."""
if EVENT_DATA["action"] not in ["opened", "synchronized"]:
print("Unsupported event.")
return
diff = get_diff()
if diff is None:
return
elif not diff:
print("No diff found.")
return
full_context_model = Model(
model=FULL_CONTEXT_MODEL_NAME,
api_key=FULL_CONTEXT_API_KEY,
is_full_context=True,
)
single_chunk_model = Model(
model=SINGLE_CHUNK_MODEL_NAME,
api_key=SINGLE_CHUNK_API_KEY,
is_full_context=False,
)
parsed_diff = parse_diff(diff)
comments_task = asyncio.create_task(
analyze_single_chunks(single_chunk_model, parsed_diff)
)
if EVENT_DATA["action"] == "opened":
full_context_response_task = asyncio.create_task(
analyze_full_context(full_context_model, parsed_diff)
)
full_context_response = await full_context_response_task
else:
full_context_response = ""
comments = await comments_task
post_review(full_context_response, comments)
if __name__ == "__main__":
asyncio.run(main())

261
.github/scripts/model.py vendored Normal file
View File

@@ -0,0 +1,261 @@
"""Model for code review."""
from enum import Enum
from typing import Any
import google.generativeai as genai
import typing_extensions as typing
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
class GoogleResponse(typing.TypedDict):
"""The response from Google model."""
lineNumber: int
reviewComment: str
class ModelProvider(Enum):
"""The model provider."""
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
DEEPSEEK = "deepseek"
@classmethod
def from_model(cls, model: str) -> "ModelProvider":
"""Get the model provider from the model name.
Args:
model (str): The model name.
Returns:
ModelProvider: The model provider.
"""
for prefix, provider in PREFIX_TO_MODEL.items():
if model.startswith(prefix):
return provider
raise ValueError(f"Unknown model: {model}")
PREFIX_TO_MODEL = {
"gpt": ModelProvider.OPENAI,
"o1": ModelProvider.OPENAI,
"o3": ModelProvider.OPENAI,
"claude": ModelProvider.ANTHROPIC,
"gemini": ModelProvider.GOOGLE,
"deepseek": ModelProvider.DEEPSEEK,
}
class Model:
"""The model class.
Attributes:
model (str): The model name.
api_key (str): The API key.
system_prompt (str): The system prompt.
max_tokens (int): The maximum tokens.
"""
def __init__( # noqa: D107
self,
model: str,
api_key: str,
is_full_context: bool,
max_tokens: int = 4196,
):
self.model = model
self.system_prompt = (
FULL_CONTEXT_SYSTEM_PROMPT
if is_full_context
else SINGLE_CHUNK_SYSTEM_PROMPT
)
self.max_tokens = max_tokens
self.provider = ModelProvider.from_model(model)
self.session = self.create_session(api_key)
def create_session(self, api_key: str) -> Any:
"""Create a session for the model.
Args:
api_key (str): The API key.
Returns:
Any: The session.
"""
match self.provider:
case ModelProvider.OPENAI:
return AsyncOpenAI(api_key=api_key)
case ModelProvider.ANTHROPIC:
return AsyncAnthropic(api_key=api_key)
case ModelProvider.GOOGLE:
genai.configure(api_key=api_key)
return genai.GenerativeModel(
model_name=self.model, system_instruction=self.system_prompt
)
case ModelProvider.DEEPSEEK:
return AsyncOpenAI(api_key=api_key, base_url="https://api.deepseek.com")
async def request(self, prompt: str) -> str:
"""Request the model to generate a response.
Args:
prompt (str): The prompt to generate a response for.
Returns:
str: The generated response.
"""
match self.provider:
case ModelProvider.OPENAI | ModelProvider.DEEPSEEK:
response = await self.session.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=self.max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
return response.choices[0].message.content.strip()
case ModelProvider.ANTHROPIC:
response = await self.session.messages.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
system=[
{
"type": "text",
"text": self.system_prompt,
"cache_control": {"type": "ephemeral"},
}
],
temperature=0.2,
max_tokens=self.max_tokens,
)
return response.content[0].text.strip()
case ModelProvider.GOOGLE:
response = await self.session.generate_content_async(
prompt,
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=list[GoogleResponse],
),
)
return response.text.strip()
async def get_response_single_chunk(
self, file: str, title: str, description: str, chunk: str
) -> str:
"""Get the response for a single chunk.
Args:
file (str): The file name.
title (str): The pull request title.
description (str): The pull request description.
chunk (str): The diff chunk.
Returns:
str: The response.
"""
prompt = SINGLE_CHUNK_USER_PROMPT.format(file, title, description, chunk)
return await self.request(prompt)
async def get_response_full_context(
self, title: str, description: str, file_contents: list[str]
) -> str:
"""Get the response for full context.
Args:
title (str): The pull request title.
description (str): The pull request description.
file_contents (list[str]): The file contents, diffs.
Returns:
str: The response.
"""
try:
prompt = FULL_CONTEXT_USER_PROMPT.format(
title, description, "\n".join(file_contents)
)
return await self.request(prompt)
except Exception as e:
print(f"Error during full context response: {e}")
print(prompt)
return None
SINGLE_CHUNK_SYSTEM_PROMPT = (
"Your task is to review pull requests. Instructions:\n"
"- Provide the response in the following JSON format: "
"""[{{"lineNumber": int, "reviewComment": str}}] \n"""
"- lineNumber is about the line number of the code that in new file. \n"
"- lineNumber can be found at the front of each line. \n"
"- At the first number is old line number, the second number is new line number. \n"
"- If the line starts with `+`, it means the line is added. \n"
"- If the line starts with `-`, it means the line is deleted. \n"
"- Evaluate whether the code changes and additions are appropriate "
"and if the new code structure is suitable. \n"
"- Do not give positive comments or compliments. \n"
"- Provide comments and suggestions ONLY if there is something to improve"
"otherwise return an empty array. \n"
"- Write the comment in GitHub Markdown format. \n"
"- Use the given description only for the overall context "
"and only comment the code. \n"
"- Do not suggest type hint or naming convention. \n"
"- IMPORTANT: NEVER suggest adding comments to the code. \n"
)
SINGLE_CHUNK_USER_PROMPT = (
"Review the following code diff in the file "
"{} and take the pull request title and description into account "
"when writing the response. \n"
"Pull request title: {} \n"
"Pull request description: \n"
"--- \n"
"{} \n"
"--- \n"
"Git diff to review: \n"
"```diff \n"
"{} \n"
"```"
)
FULL_CONTEXT_SYSTEM_PROMPT = (
"You are an experienced software engineer specializing in reviewing pull "
"requests. Your task is to provide an overall code review summary for a PR. "
"Focus on assessing the following aspects:\n"
"1. **Code Structure & Architecture:** "
"Evaluate whether the code is well-organized, modular, "
"and adheres to clean code principles. Suggest improvements if needed.\n"
"2. **Refactoring Opportunities:** "
"Identify areas where the code can be optimized or simplified without changing "
"its behavior.\n"
"3. **Potential Future Problems:** "
"Highlight possible scalability, maintainability, or dependency issues that might "
"arise in the future based on the current implementation.\n"
"Be constructive and clear in your feedback. Avoid commenting on trivial issues "
"or syntax errors—focus on high-level feedback.\n"
"Precise instructions:\n"
"- Do not give positive comments or compliments.\n"
"- Provide comments and suggestions ONLY if there is something to improve, "
"otherwise return an empty string.\n"
"- Write the comment in GitHub Markdown format.\n"
"- Do not start with 'markdown' or '```markdown'.\n"
"- IMPORTANT: Give example code block or pseudo code if you can.\n"
)
FULL_CONTEXT_USER_PROMPT = (
"Review the following code and take the pull request title "
"and description into account when writing the response. \n"
"Pull request title: {} \n"
"Pull request description: \n"
"--- \n"
"{} \n"
"--- \n"
"Code to review: \n"
"{}"
)

View File

@@ -1,42 +1,14 @@
on: on:
push:
branches: [main, release]
pull_request: pull_request:
branches: [dev]
types: [unlabeled, opened, synchronize, reopened] types: [unlabeled, opened, synchronize, reopened]
merge_group: merge_group:
workflow_dispatch: workflow_dispatch:
name: CI name: Dev-CI
# Cancel previous workflows if they are the same workflow on same ref (branch/tags)
# with the same event (push/pull_request) even they are in progress.
# This setting will help reduce the number of duplicated workflows.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true
env: env:
CARGO_ARGS: --no-default-features --features stdlib,importlib,encodings,sqlite,ssl CARGO_ARGS: --no-default-features --features stdlib,zlib,importlib,encodings,sqlite,ssl
# Skip additional tests on Windows. They are checked on Linux and MacOS.
# test_glob: many failing tests
# test_io: many failing tests
# test_os: many failing tests
# test_pathlib: support.rmtree() failing
# test_posixpath: OSError: (22, 'The filename, directory name, or volume label syntax is incorrect. (os error 123)')
# test_venv: couple of failing tests
WINDOWS_SKIPS: >-
test_glob
test_io
test_os
test_rlcompleter
test_pathlib
test_posixpath
test_venv
# configparser: https://github.com/RustPython/RustPython/issues/4995#issuecomment-1582397417
# socketserver: seems related to configparser crash.
MACOS_SKIPS: >-
test_configparser
test_socketserver
# PLATFORM_INDEPENDENT_TESTS are tests that do not depend on the underlying OS. They are currently # PLATFORM_INDEPENDENT_TESTS are tests that do not depend on the underlying OS. They are currently
# only run on Linux to speed up the CI. # only run on Linux to speed up the CI.
PLATFORM_INDEPENDENT_TESTS: >- PLATFORM_INDEPENDENT_TESTS: >-
@@ -117,11 +89,7 @@ jobs:
env: env:
RUST_BACKTRACE: full RUST_BACKTRACE: full
name: Run rust tests name: Run rust tests
runs-on: ${{ matrix.os }} runs-on: ubuntu-latest
strategy:
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
fail-fast: false
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable - uses: dtolnay/rust-toolchain@stable
@@ -129,26 +97,12 @@ jobs:
components: clippy components: clippy
- uses: Swatinem/rust-cache@v2 - uses: Swatinem/rust-cache@v2
- name: Set up the Windows environment
shell: bash
run: |
cargo install --target-dir=target -v cargo-vcpkg
cargo vcpkg -v build
if: runner.os == 'Windows'
- name: Set up the Mac environment
run: brew install autoconf automake libtool
if: runner.os == 'macOS'
- name: run clippy - name: run clippy
run: cargo clippy ${{ env.CARGO_ARGS }} --workspace --exclude rustpython_wasm -- -Dwarnings run: cargo clippy ${{ env.CARGO_ARGS }} --workspace --exclude rustpython_wasm -- -Dwarnings
- name: run rust tests - name: run rust tests
run: cargo test --workspace --exclude rustpython_wasm --verbose --features threading ${{ env.CARGO_ARGS }} run: cargo test --workspace --exclude rustpython_wasm --verbose --features threading ${{ env.CARGO_ARGS }}
if: runner.os != 'macOS'
- name: run rust tests
run: cargo test --workspace --exclude rustpython_wasm --exclude rustpython-jit --verbose --features threading ${{ env.CARGO_ARGS }}
if: runner.os == 'macOS'
- name: check compilation without threading - name: check compilation without threading
run: cargo check ${{ env.CARGO_ARGS }} run: cargo check ${{ env.CARGO_ARGS }}
@@ -156,24 +110,6 @@ jobs:
run: run:
cargo run --manifest-path example_projects/barebone/Cargo.toml cargo run --manifest-path example_projects/barebone/Cargo.toml
cargo run --manifest-path example_projects/frozen_stdlib/Cargo.toml cargo run --manifest-path example_projects/frozen_stdlib/Cargo.toml
if: runner.os == 'Linux'
- name: prepare AppleSilicon build
uses: dtolnay/rust-toolchain@stable
with:
target: aarch64-apple-darwin
if: runner.os == 'macOS'
- name: Check compilation for Apple Silicon
run: cargo check --target aarch64-apple-darwin
if: runner.os == 'macOS'
- name: prepare iOS build
uses: dtolnay/rust-toolchain@stable
with:
target: aarch64-apple-ios
if: runner.os == 'macOS'
- name: Check compilation for iOS
run: cargo check --target aarch64-apple-ios
if: runner.os == 'macOS'
exotic_targets: exotic_targets:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }} if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
@@ -240,11 +176,7 @@ jobs:
env: env:
RUST_BACKTRACE: full RUST_BACKTRACE: full
name: Run snippets and cpython tests name: Run snippets and cpython tests
runs-on: ${{ matrix.os }} runs-on: ubuntu-latest
strategy:
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
fail-fast: false
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable - uses: dtolnay/rust-toolchain@stable
@@ -252,54 +184,29 @@ jobs:
- uses: actions/setup-python@v5 - uses: actions/setup-python@v5
with: with:
python-version: ${{ env.PYTHON_VERSION }} python-version: ${{ env.PYTHON_VERSION }}
- name: Set up the Windows environment
shell: bash
run: |
cargo install cargo-vcpkg
cargo vcpkg build
if: runner.os == 'Windows'
- name: Set up the Mac environment
run: brew install autoconf automake libtool openssl@3
if: runner.os == 'macOS'
- name: build rustpython
run: cargo build --release --verbose --features=threading ${{ env.CARGO_ARGS }}
if: runner.os == 'macOS'
- name: build rustpython - name: build rustpython
run: cargo build --release --verbose --features=threading ${{ env.CARGO_ARGS }},jit run: cargo build --release --verbose --features=threading ${{ env.CARGO_ARGS }},jit
if: runner.os != 'macOS'
- uses: actions/setup-python@v5 - uses: actions/setup-python@v5
with: with:
python-version: ${{ env.PYTHON_VERSION }} python-version: ${{ env.PYTHON_VERSION }}
- name: run snippets - name: run snippets
run: python -m pip install -r requirements.txt && pytest -v run: python -m pip install -r requirements.txt && pytest -v
working-directory: ./extra_tests working-directory: ./extra_tests
- if: runner.os == 'Linux' - name: run cpython platform-independent tests
name: run cpython platform-independent tests
run: run:
target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v ${{ env.PLATFORM_INDEPENDENT_TESTS }} target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v ${{ env.PLATFORM_INDEPENDENT_TESTS }}
- if: runner.os == 'Linux' - name: run cpython platform-dependent tests (Linux)
name: run cpython platform-dependent tests (Linux)
run: target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} run: target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }}
- if: runner.os == 'macOS' - name: check that --install-pip succeeds
name: run cpython platform-dependent tests (MacOS)
run: target/release/rustpython -m test -j 1 --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} ${{ env.MACOS_SKIPS }}
- if: runner.os == 'Windows'
name: run cpython platform-dependent tests (windows partial - fixme)
run:
target/release/rustpython -m test -j 1 --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} ${{ env.WINDOWS_SKIPS }}
- if: runner.os != 'Windows'
name: check that --install-pip succeeds
run: | run: |
mkdir site-packages mkdir site-packages
target/release/rustpython --install-pip ensurepip --user target/release/rustpython --install-pip ensurepip --user
target/release/rustpython -m pip install six target/release/rustpython -m pip install six
- if: runner.os != 'Windows' - name: Check that ensurepip succeeds.
name: Check that ensurepip succeeds.
run: | run: |
target/release/rustpython -m ensurepip target/release/rustpython -m ensurepip
target/release/rustpython -c "import pip" target/release/rustpython -c "import pip"
- if: runner.os != 'Windows' - name: Check if pip inside venv is functional
name: Check if pip inside venv is functional
run: | run: |
target/release/rustpython -m venv testvenv target/release/rustpython -m venv testvenv
testvenv/bin/rustpython -m pip install wheel testvenv/bin/rustpython -m pip install wheel
@@ -348,84 +255,3 @@ jobs:
# a memory leak, at least until we have proper cyclic gc # a memory leak, at least until we have proper cyclic gc
run: MIRIFLAGS='-Zmiri-ignore-leaks' cargo +nightly miri test -p rustpython-vm -- miri_test run: MIRIFLAGS='-Zmiri-ignore-leaks' cargo +nightly miri test -p rustpython-vm -- miri_test
wasm:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
name: Check the WASM package and demo
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: install geckodriver
run: |
wget https://github.com/mozilla/geckodriver/releases/download/v0.36.0/geckodriver-v0.36.0-linux64.tar.gz
mkdir geckodriver
tar -xzf geckodriver-v0.36.0-linux64.tar.gz -C geckodriver
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- run: python -m pip install -r requirements.txt
working-directory: ./wasm/tests
- uses: actions/setup-node@v4
with:
cache: "npm"
cache-dependency-path: "wasm/demo/package-lock.json"
- name: run test
run: |
export PATH=$PATH:`pwd`/../../geckodriver
npm install
npm run test
env:
NODE_OPTIONS: "--openssl-legacy-provider"
working-directory: ./wasm/demo
- uses: mwilliamson/setup-wabt-action@v3
with: { wabt-version: "1.0.36" }
- name: check wasm32-unknown without js
run: |
cd wasm/wasm-unknown-test
cargo build --release --verbose
if wasm-objdump -xj Import target/wasm32-unknown-unknown/release/wasm_unknown_test.wasm; then
echo "ERROR: wasm32-unknown module expects imports from the host environment" >2
fi
- name: build notebook demo
if: github.ref == 'refs/heads/release'
run: |
npm install
npm run dist
mv dist ../demo/dist/notebook
env:
NODE_OPTIONS: "--openssl-legacy-provider"
working-directory: ./wasm/notebook
- name: Deploy demo to Github Pages
if: success() && github.ref == 'refs/heads/release'
uses: peaceiris/actions-gh-pages@v4
env:
ACTIONS_DEPLOY_KEY: ${{ secrets.ACTIONS_DEMO_DEPLOY_KEY }}
PUBLISH_DIR: ./wasm/demo/dist
EXTERNAL_REPOSITORY: RustPython/demo
PUBLISH_BRANCH: master
wasm-wasi:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
name: Run snippets and cpython tests on wasm-wasi
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
target: wasm32-wasip1
- uses: Swatinem/rust-cache@v2
- name: Setup Wasmer
uses: wasmerio/setup-wasmer@v3
- name: Install clang
run: sudo apt-get update && sudo apt-get install clang -y
- name: build rustpython
run: cargo build --release --target wasm32-wasip1 --features freeze-stdlib,stdlib --verbose
- name: run snippets
run: wasmer run --dir `pwd` target/wasm32-wasip1/release/rustpython.wasm -- `pwd`/extra_tests/snippets/stdlib_random.py
- name: run cpython unittest
run: wasmer run --dir `pwd` target/wasm32-wasip1/release/rustpython.wasm -- `pwd`/Lib/test/test_int.py

36
.github/workflows/code-review.yml vendored Normal file
View File

@@ -0,0 +1,36 @@
name: Code Review
on:
pull_request:
branches: [dev]
types: [opened, synchronize]
permissions:
contents: read
pull-requests: write
jobs:
review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install aiohttp requests py-gitea openai anthropic google-generativeai
- name: Run Code Review
env:
ACCESS_TOKEN: ${{ secrets.ACCESS_TOKEN }}
FULL_CONTEXT_MODEL: o3-mini
FULL_CONTEXT_API_KEY: ${{ secrets.OPENAI_API_KEY }}
SINGLE_CHUNK_MODEL: gemini-2.0-flash-exp
SINGLE_CHUNK_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
EXCLUDE: "*.yml,*.yaml"
run: python .gitea/scripts/code_review.py

View File

@@ -101,7 +101,7 @@ jobs:
[ -f ./_data/whats_left.temp ] && cp ./_data/whats_left.temp ./_data/whats_left_lastrun.temp [ -f ./_data/whats_left.temp ] && cp ./_data/whats_left.temp ./_data/whats_left_lastrun.temp
cp ../whats_left.temp ./_data/whats_left.temp cp ../whats_left.temp ./_data/whats_left.temp
rm ./_data/whats_left/modules.csv rm ./_data/whats_left/modules.csv
echo -e "modules" > ./_data/whats_left/modules.csv echo -e "module" > ./_data/whats_left/modules.csv
cat ./_data/whats_left.temp | grep "(entire module)" | cut -d ' ' -f 1 | sort >> ./_data/whats_left/modules.csv cat ./_data/whats_left.temp | grep "(entire module)" | cut -d ' ' -f 1 | sort >> ./_data/whats_left/modules.csv
git add -A git add -A
if git -c user.name="Github Actions" -c user.email="actions@github.com" commit -m "Update what is left results" --author="$GITHUB_ACTOR"; then if git -c user.name="Github Actions" -c user.email="actions@github.com" commit -m "Update what is left results" --author="$GITHUB_ACTOR"; then

5
Lib/_osx_support.py vendored
View File

@@ -507,6 +507,11 @@ def get_platform_osx(_config_vars, osname, release, machine):
# MACOSX_DEPLOYMENT_TARGET. # MACOSX_DEPLOYMENT_TARGET.
macver = _config_vars.get('MACOSX_DEPLOYMENT_TARGET', '') macver = _config_vars.get('MACOSX_DEPLOYMENT_TARGET', '')
if macver and '.' not in macver:
# Ensure that the version includes at least a major
# and minor version, even if MACOSX_DEPLOYMENT_TARGET
# is set to a single-label version like "14".
macver += '.0'
macrelease = _get_system_version() or macver macrelease = _get_system_version() or macver
macver = macver or macrelease macver = macver or macrelease

1012
Lib/cgi.py vendored

File diff suppressed because it is too large Load Diff

39
Lib/email/message.py vendored
View File

@@ -7,7 +7,6 @@
__all__ = ['Message', 'EmailMessage'] __all__ = ['Message', 'EmailMessage']
import re import re
import uu
import quopri import quopri
from io import BytesIO, StringIO from io import BytesIO, StringIO
@@ -101,6 +100,35 @@ def _unquotevalue(value):
return utils.unquote(value) return utils.unquote(value)
def _decode_uu(encoded):
"""Decode uuencoded data."""
decoded_lines = []
encoded_lines_iter = iter(encoded.splitlines())
for line in encoded_lines_iter:
if line.startswith(b"begin "):
mode, _, path = line.removeprefix(b"begin ").partition(b" ")
try:
int(mode, base=8)
except ValueError:
continue
else:
break
else:
raise ValueError("`begin` line not found")
for line in encoded_lines_iter:
if not line:
raise ValueError("Truncated input")
elif line.strip(b' \t\r\n\f') == b'end':
break
try:
decoded_line = binascii.a2b_uu(line)
except binascii.Error:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((line[0]-32) & 63) * 4 + 5) // 3
decoded_line = binascii.a2b_uu(line[:nbytes])
decoded_lines.append(decoded_line)
return b''.join(decoded_lines)
class Message: class Message:
"""Basic message object. """Basic message object.
@@ -288,13 +316,10 @@ class Message:
self.policy.handle_defect(self, defect) self.policy.handle_defect(self, defect)
return value return value
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'): elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
in_file = BytesIO(bpayload)
out_file = BytesIO()
try: try:
uu.decode(in_file, out_file, quiet=True) return _decode_uu(bpayload)
return out_file.getvalue() except ValueError:
except uu.Error: # Some decoding problem.
# Some decoding problem
return bpayload return bpayload
if isinstance(payload, str): if isinstance(payload, str):
return bpayload return bpayload

645
Lib/test/test_cgi.py vendored
View File

@@ -1,645 +0,0 @@
import os
import sys
import tempfile
import unittest
from collections import namedtuple
from io import StringIO, BytesIO
from test import support
from test.support import warnings_helper
cgi = warnings_helper.import_deprecated("cgi")
class HackedSysModule:
# The regression test will have real values in sys.argv, which
# will completely confuse the test of the cgi module
argv = []
stdin = sys.stdin
cgi.sys = HackedSysModule()
class ComparableException:
def __init__(self, err):
self.err = err
def __str__(self):
return str(self.err)
def __eq__(self, anExc):
if not isinstance(anExc, Exception):
return NotImplemented
return (self.err.__class__ == anExc.__class__ and
self.err.args == anExc.args)
def __getattr__(self, attr):
return getattr(self.err, attr)
def do_test(buf, method):
env = {}
if method == "GET":
fp = None
env['REQUEST_METHOD'] = 'GET'
env['QUERY_STRING'] = buf
elif method == "POST":
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
env['CONTENT_LENGTH'] = str(len(buf))
else:
raise ValueError("unknown method: %s" % method)
try:
return cgi.parse(fp, env, strict_parsing=1)
except Exception as err:
return ComparableException(err)
parse_strict_test_cases = [
("", {}),
("&", ValueError("bad query field: ''")),
("&&", ValueError("bad query field: ''")),
# Should the next few really be valid?
("=", {}),
("=&=", {}),
# This rest seem to make sense
("=a", {'': ['a']}),
("&=a", ValueError("bad query field: ''")),
("=a&", ValueError("bad query field: ''")),
("=&a", ValueError("bad query field: 'a'")),
("b=a", {'b': ['a']}),
("b+=a", {'b ': ['a']}),
("a=b=a", {'a': ['b=a']}),
("a=+b=a", {'a': [' b=a']}),
("&b=a", ValueError("bad query field: ''")),
("b&=a", ValueError("bad query field: 'b'")),
("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
{'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
'cuyer': ['r'],
'expire': ['964546263'],
'kid': ['130003.300038'],
'lobale': ['en-US'],
'order_id': ['0bb2e248638833d48cb7fed300000f1b'],
'ss': ['env'],
'view': ['bustomer'],
}),
("group_id=5470&set=custom&_assigned_to=31392&_status=1&_category=100&SUBMIT=Browse",
{'SUBMIT': ['Browse'],
'_assigned_to': ['31392'],
'_category': ['100'],
'_status': ['1'],
'group_id': ['5470'],
'set': ['custom'],
})
]
def norm(seq):
return sorted(seq, key=repr)
def first_elts(list):
return [p[0] for p in list]
def first_second_elts(list):
return [(p[0], p[1][0]) for p in list]
def gen_result(data, environ):
encoding = 'latin-1'
fake_stdin = BytesIO(data.encode(encoding))
fake_stdin.seek(0)
form = cgi.FieldStorage(fp=fake_stdin, environ=environ, encoding=encoding)
result = {}
for k, v in dict(form).items():
result[k] = isinstance(v, list) and form.getlist(k) or v.value
return result
class CgiTests(unittest.TestCase):
def test_parse_multipart(self):
fp = BytesIO(POSTDATA.encode('latin1'))
env = {'boundary': BOUNDARY.encode('latin1'),
'CONTENT-LENGTH': '558'}
result = cgi.parse_multipart(fp, env)
expected = {'submit': [' Add '], 'id': ['1234'],
'file': [b'Testing 123.\n'], 'title': ['']}
self.assertEqual(result, expected)
def test_parse_multipart_without_content_length(self):
POSTDATA = '''--JfISa01
Content-Disposition: form-data; name="submit-name"
just a string
--JfISa01--
'''
fp = BytesIO(POSTDATA.encode('latin1'))
env = {'boundary': 'JfISa01'.encode('latin1')}
result = cgi.parse_multipart(fp, env)
expected = {'submit-name': ['just a string\n']}
self.assertEqual(result, expected)
# TODO RUSTPYTHON - see https://github.com/RustPython/RustPython/issues/935
@unittest.expectedFailure
def test_parse_multipart_invalid_encoding(self):
BOUNDARY = "JfISa01"
POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 3
\u2603
--JfISa01"""
fp = BytesIO(POSTDATA.encode('utf8'))
env = {'boundary': BOUNDARY.encode('latin1'),
'CONTENT-LENGTH': str(len(POSTDATA.encode('utf8')))}
result = cgi.parse_multipart(fp, env, encoding="ascii",
errors="surrogateescape")
expected = {'submit-name': ["\udce2\udc98\udc83"]}
self.assertEqual(result, expected)
self.assertEqual("\u2603".encode('utf8'),
result["submit-name"][0].encode('utf8', 'surrogateescape'))
def test_fieldstorage_properties(self):
fs = cgi.FieldStorage()
self.assertFalse(fs)
self.assertIn("FieldStorage", repr(fs))
self.assertEqual(list(fs), list(fs.keys()))
fs.list.append(namedtuple('MockFieldStorage', 'name')('fieldvalue'))
self.assertTrue(fs)
def test_fieldstorage_invalid(self):
self.assertRaises(TypeError, cgi.FieldStorage, "not-a-file-obj",
environ={"REQUEST_METHOD":"PUT"})
self.assertRaises(TypeError, cgi.FieldStorage, "foo", "bar")
fs = cgi.FieldStorage(headers={'content-type':'text/plain'})
self.assertRaises(TypeError, bool, fs)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_strict(self):
for orig, expect in parse_strict_test_cases:
# Test basic parsing
d = do_test(orig, "GET")
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
d = do_test(orig, "POST")
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(environ=env)
if isinstance(expect, dict):
# test dict interface
self.assertEqual(len(expect), len(fs))
self.assertCountEqual(expect.keys(), fs.keys())
##self.assertEqual(norm(expect.values()), norm(fs.values()))
##self.assertEqual(norm(expect.items()), norm(fs.items()))
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
# test individual fields
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
def test_separator(self):
parse_semicolon = [
("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
(";", ValueError("bad query field: ''")),
(";;", ValueError("bad query field: ''")),
("=;a", ValueError("bad query field: 'a'")),
(";b=a", ValueError("bad query field: ''")),
("b;=a", ValueError("bad query field: 'b'")),
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
]
for orig, expect in parse_semicolon:
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(separator=';', environ=env)
if isinstance(expect, dict):
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_log(self):
cgi.log("Testing")
cgi.logfp = StringIO()
cgi.initlog("%s", "Testing initlog 1")
cgi.log("%s", "Testing log 2")
self.assertEqual(cgi.logfp.getvalue(), "Testing initlog 1\nTesting log 2\n")
if os.path.exists(os.devnull):
cgi.logfp = None
cgi.logfile = os.devnull
cgi.initlog("%s", "Testing log 3")
self.addCleanup(cgi.closelog)
cgi.log("Testing log 4")
def test_fieldstorage_readline(self):
# FieldStorage uses readline, which has the capacity to read all
# contents of the input file into memory; we use readline's size argument
# to prevent that for files that do not contain any newlines in
# non-GET/HEAD requests
class TestReadlineFile:
def __init__(self, file):
self.file = file
self.numcalls = 0
def readline(self, size=None):
self.numcalls += 1
if size:
return self.file.readline(size)
else:
return self.file.readline()
def __getattr__(self, name):
file = self.__dict__['file']
a = getattr(file, name)
if not isinstance(a, int):
setattr(self, name, a)
return a
f = TestReadlineFile(tempfile.TemporaryFile("wb+"))
self.addCleanup(f.close)
f.write(b'x' * 256 * 1024)
f.seek(0)
env = {'REQUEST_METHOD':'PUT'}
fs = cgi.FieldStorage(fp=f, environ=env)
self.addCleanup(fs.file.close)
# if we're not chunking properly, readline is only called twice
# (by read_binary); if we are chunking properly, it will be called 5 times
# as long as the chunksize is 1 << 16.
self.assertGreater(f.numcalls, 2)
f.close()
def test_fieldstorage_multipart(self):
#Test basic FieldStorage multipart parsing
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '558'}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_leading_whitespace(self):
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '560'}
# Add some leading whitespace to our post data that will cause the
# first line to not be the innerboundary.
fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_non_ascii(self):
#Test basic FieldStorage multipart parsing
env = {'REQUEST_METHOD':'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH':'558'}
for encoding in ['iso-8859-1','utf-8']:
fp = BytesIO(POSTDATA_NON_ASCII.encode(encoding))
fs = cgi.FieldStorage(fp, environ=env,encoding=encoding)
self.assertEqual(len(fs.list), 1)
expect = [{'name':'id', 'filename':None, 'value':'\xe7\xf1\x80'}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_maxline(self):
# Issue #18167
maxline = 1 << 16
self.maxDiff = None
def check(content):
data = """---123
Content-Disposition: form-data; name="upload"; filename="fake.txt"
Content-Type: text/plain
%s
---123--
""".replace('\n', '\r\n') % content
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'REQUEST_METHOD': 'POST',
}
self.assertEqual(gen_result(data, environ),
{'upload': content.encode('latin1')})
check('x' * (maxline - 1))
check('x' * (maxline - 1) + '\r')
check('x' * (maxline - 1) + '\r' + 'y' * (maxline - 1))
def test_fieldstorage_multipart_w3c(self):
# Test basic FieldStorage multipart parsing (W3C sample)
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
'CONTENT_LENGTH': str(len(POSTDATA_W3))}
fp = BytesIO(POSTDATA_W3.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 2)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
self.assertEqual(fs.list[1].name, 'files')
files = fs.list[1].value
self.assertEqual(len(files), 2)
expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
{'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
for x in range(len(files)):
for k, exp in expect[x].items():
got = getattr(files[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_part_content_length(self):
BOUNDARY = "JfISa01"
POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 5
Larry
--JfISa01"""
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': str(len(POSTDATA))}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 1)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
def test_field_storage_multipart_no_content_length(self):
fp = BytesIO(b"""--MyBoundary
Content-Disposition: form-data; name="my-arg"; filename="foo"
Test
--MyBoundary--
""")
env = {
"REQUEST_METHOD": "POST",
"CONTENT_TYPE": "multipart/form-data; boundary=MyBoundary",
"wsgi.input": fp,
}
fields = cgi.FieldStorage(fp, environ=env)
self.assertEqual(len(fields["my-arg"].file.read()), 5)
def test_fieldstorage_as_context_manager(self):
fp = BytesIO(b'x' * 10)
env = {'REQUEST_METHOD': 'PUT'}
with cgi.FieldStorage(fp=fp, environ=env) as fs:
content = fs.file.read()
self.assertFalse(fs.file.closed)
self.assertTrue(fs.file.closed)
self.assertEqual(content, 'x' * 10)
with self.assertRaisesRegex(ValueError, 'I/O operation on closed file'):
fs.file.read()
_qs_result = {
'key1': 'value1',
'key2': ['value2x', 'value2y'],
'key3': 'value3',
'key4': 'value4'
}
def testQSAndUrlEncode(self):
data = "key2=value2x&key3=value3&key4=value4"
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': 'key1=value1&key2=value2y',
'REQUEST_METHOD': 'POST',
}
v = gen_result(data, environ)
self.assertEqual(self._qs_result, v)
def test_max_num_fields(self):
# For application/x-www-form-urlencoded
data = '&'.join(['a=a']*11)
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'REQUEST_METHOD': 'POST',
}
with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=10,
)
# For multipart/form-data
data = """---123
Content-Disposition: form-data; name="a"
3
---123
Content-Type: application/x-www-form-urlencoded
a=4
---123
Content-Type: application/x-www-form-urlencoded
a=5
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'a=1&a=2',
'REQUEST_METHOD': 'POST',
}
# 2 GET entities
# 1 top level POST entities
# 1 entity within the second POST entity
# 1 entity within the third POST entity
with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=4,
)
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=5,
)
def testQSAndFormData(self):
data = """---123
Content-Disposition: form-data; name="key2"
value2y
---123
Content-Disposition: form-data; name="key3"
value3
---123
Content-Disposition: form-data; name="key4"
value4
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'key1=value1&key2=value2x',
'REQUEST_METHOD': 'POST',
}
v = gen_result(data, environ)
self.assertEqual(self._qs_result, v)
def testQSAndFormDataFile(self):
data = """---123
Content-Disposition: form-data; name="key2"
value2y
---123
Content-Disposition: form-data; name="key3"
value3
---123
Content-Disposition: form-data; name="key4"
value4
---123
Content-Disposition: form-data; name="upload"; filename="fake.txt"
Content-Type: text/plain
this is the content of the fake file
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'key1=value1&key2=value2x',
'REQUEST_METHOD': 'POST',
}
result = self._qs_result.copy()
result.update({
'upload': b'this is the content of the fake file\n'
})
v = gen_result(data, environ)
self.assertEqual(result, v)
def test_parse_header(self):
self.assertEqual(
cgi.parse_header("text/plain"),
("text/plain", {}))
self.assertEqual(
cgi.parse_header("text/vnd.just.made.this.up ; "),
("text/vnd.just.made.this.up", {}))
self.assertEqual(
cgi.parse_header("text/plain;charset=us-ascii"),
("text/plain", {"charset": "us-ascii"}))
self.assertEqual(
cgi.parse_header('text/plain ; charset="us-ascii"'),
("text/plain", {"charset": "us-ascii"}))
self.assertEqual(
cgi.parse_header('text/plain ; charset="us-ascii"; another=opt'),
("text/plain", {"charset": "us-ascii", "another": "opt"}))
self.assertEqual(
cgi.parse_header('attachment; filename="silly.txt"'),
("attachment", {"filename": "silly.txt"}))
self.assertEqual(
cgi.parse_header('attachment; filename="strange;name"'),
("attachment", {"filename": "strange;name"}))
self.assertEqual(
cgi.parse_header('attachment; filename="strange;name";size=123;'),
("attachment", {"filename": "strange;name", "size": "123"}))
self.assertEqual(
cgi.parse_header('form-data; name="files"; filename="fo\\"o;bar"'),
("form-data", {"name": "files", "filename": 'fo"o;bar'}))
def test_all(self):
not_exported = {
"logfile", "logfp", "initlog", "dolog", "nolog", "closelog", "log",
"maxlen", "valid_boundary"}
support.check__all__(self, cgi, not_exported=not_exported)
BOUNDARY = "---------------------------721837373350705526688164684"
POSTDATA = """-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="id"
1234
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="title"
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="file"; filename="test.txt"
Content-Type: text/plain
Testing 123.
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="submit"
Add\x20
-----------------------------721837373350705526688164684--
"""
POSTDATA_NON_ASCII = """-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="id"
\xe7\xf1\x80
-----------------------------721837373350705526688164684
"""
# http://www.w3.org/TR/html401/interact/forms.html#h-17.13.4
BOUNDARY_W3 = "AaB03x"
POSTDATA_W3 = """--AaB03x
Content-Disposition: form-data; name="submit-name"
Larry
--AaB03x
Content-Disposition: form-data; name="files"
Content-Type: multipart/mixed; boundary=BbC04y
--BbC04y
Content-Disposition: file; filename="file1.txt"
Content-Type: text/plain
... contents of file1.txt ...
--BbC04y
Content-Disposition: file; filename="file2.gif"
Content-Type: image/gif
Content-Transfer-Encoding: binary
...contents of file2.gif...
--BbC04y--
--AaB03x--
"""
if __name__ == '__main__':
unittest.main()

View File

@@ -778,6 +778,7 @@ class CGIHTTPServerTestCase(BaseTestCase):
# TODO: RUSTPYTHON # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON; works only on windows") @unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON; works only on windows")
@unittest.expectedFailure
def test_post(self): def test_post(self):
params = urllib.parse.urlencode( params = urllib.parse.urlencode(
{'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})

258
Lib/test/test_uu.py vendored
View File

@@ -1,258 +0,0 @@
"""
Tests for uu module.
Nick Mathewson
"""
import unittest
from test.support import os_helper
import os
import stat
import sys
import uu
import io
plaintext = b"The symbols on top of your keyboard are !@#$%^&*()_+|~\n"
encodedtext = b"""\
M5&AE('-Y;6)O;',@;VX@=&]P(&]F('EO=7(@:V5Y8F]A<F0@87)E("% (R0E
*7B8J*"E?*WQ^"@ """
# Stolen from io.py
class FakeIO(io.TextIOWrapper):
"""Text I/O implementation using an in-memory buffer.
Can be a used as a drop-in replacement for sys.stdin and sys.stdout.
"""
# XXX This is really slow, but fully functional
def __init__(self, initial_value="", encoding="utf-8",
errors="strict", newline="\n"):
super(FakeIO, self).__init__(io.BytesIO(),
encoding=encoding,
errors=errors,
newline=newline)
self._encoding = encoding
self._errors = errors
if initial_value:
if not isinstance(initial_value, str):
initial_value = str(initial_value)
self.write(initial_value)
self.seek(0)
def getvalue(self):
self.flush()
return self.buffer.getvalue().decode(self._encoding, self._errors)
def encodedtextwrapped(mode, filename, backtick=False):
if backtick:
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
encodedtext.replace(b' ', b'`') + b"\n`\nend\n")
else:
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
encodedtext + b"\n \nend\n")
return res
class UUTest(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_encode(self):
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1")
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1"))
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1", 0o644)
self.assertEqual(out.getvalue(), encodedtextwrapped(0o644, "t1"))
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1", backtick=True)
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1", True))
with self.assertRaises(TypeError):
uu.encode(inp, out, "t1", 0o644, True)
def test_decode(self):
for backtick in True, False:
inp = io.BytesIO(encodedtextwrapped(0o666, "t1", backtick=backtick))
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
inp = io.BytesIO(
b"UUencoded files may contain many lines,\n" +
b"even some that have 'begin' in them.\n" +
encodedtextwrapped(0o666, "t1", backtick=backtick)
)
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
def test_truncatedinput(self):
inp = io.BytesIO(b"begin 644 t1\n" + encodedtext)
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception raised")
except uu.Error as e:
self.assertEqual(str(e), "Truncated input file")
def test_missingbegin(self):
inp = io.BytesIO(b"")
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception raised")
except uu.Error as e:
self.assertEqual(str(e), "No valid begin line found in input file")
def test_garbage_padding(self):
# Issue #22406
encodedtext1 = (
b"begin 644 file\n"
# length 1; bits 001100 111111 111111 111111
b"\x21\x2C\x5F\x5F\x5F\n"
b"\x20\n"
b"end\n"
)
encodedtext2 = (
b"begin 644 file\n"
# length 1; bits 001100 111111 111111 111111
b"\x21\x2C\x5F\x5F\x5F\n"
b"\x60\n"
b"end\n"
)
plaintext = b"\x33" # 00110011
for encodedtext in encodedtext1, encodedtext2:
with self.subTest("uu.decode()"):
inp = io.BytesIO(encodedtext)
out = io.BytesIO()
uu.decode(inp, out, quiet=True)
self.assertEqual(out.getvalue(), plaintext)
with self.subTest("uu_codec"):
import codecs
decoded = codecs.decode(encodedtext, "uu_codec")
self.assertEqual(decoded, plaintext)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_newlines_escaped(self):
# Test newlines are escaped with uu.encode
inp = io.BytesIO(plaintext)
out = io.BytesIO()
filename = "test.txt\n\roverflow.txt"
safefilename = b"test.txt\\n\\roverflow.txt"
uu.encode(inp, out, filename)
self.assertIn(safefilename, out.getvalue())
class UUStdIOTest(unittest.TestCase):
def setUp(self):
self.stdin = sys.stdin
self.stdout = sys.stdout
def tearDown(self):
sys.stdin = self.stdin
sys.stdout = self.stdout
def test_encode(self):
sys.stdin = FakeIO(plaintext.decode("ascii"))
sys.stdout = FakeIO()
uu.encode("-", "-", "t1", 0o666)
self.assertEqual(sys.stdout.getvalue(),
encodedtextwrapped(0o666, "t1").decode("ascii"))
def test_decode(self):
sys.stdin = FakeIO(encodedtextwrapped(0o666, "t1").decode("ascii"))
sys.stdout = FakeIO()
uu.decode("-", "-")
stdout = sys.stdout
sys.stdout = self.stdout
sys.stdin = self.stdin
self.assertEqual(stdout.getvalue(), plaintext.decode("ascii"))
class UUFileTest(unittest.TestCase):
def setUp(self):
# uu.encode() supports only ASCII file names
self.tmpin = os_helper.TESTFN_ASCII + "i"
self.tmpout = os_helper.TESTFN_ASCII + "o"
self.addCleanup(os_helper.unlink, self.tmpin)
self.addCleanup(os_helper.unlink, self.tmpout)
def test_encode(self):
with open(self.tmpin, 'wb') as fin:
fin.write(plaintext)
with open(self.tmpin, 'rb') as fin:
with open(self.tmpout, 'wb') as fout:
uu.encode(fin, fout, self.tmpin, mode=0o644)
with open(self.tmpout, 'rb') as fout:
s = fout.read()
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
# in_file and out_file as filenames
uu.encode(self.tmpin, self.tmpout, self.tmpin, mode=0o644)
with open(self.tmpout, 'rb') as fout:
s = fout.read()
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
def test_decode(self):
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
with open(self.tmpin, 'rb') as f:
uu.decode(f)
with open(self.tmpout, 'rb') as f:
s = f.read()
self.assertEqual(s, plaintext)
# XXX is there an xp way to verify the mode?
def test_decode_filename(self):
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
uu.decode(self.tmpin)
with open(self.tmpout, 'rb') as f:
s = f.read()
self.assertEqual(s, plaintext)
def test_decodetwice(self):
# Verify that decode() will refuse to overwrite an existing file
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
with open(self.tmpin, 'rb') as f:
uu.decode(f)
with open(self.tmpin, 'rb') as f:
self.assertRaises(uu.Error, uu.decode, f)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_decode_mode(self):
# Verify that decode() will set the given mode for the out_file
expected_mode = 0o444
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(expected_mode, self.tmpout))
# make file writable again, so it can be removed (Windows only)
self.addCleanup(os.chmod, self.tmpout, expected_mode | stat.S_IWRITE)
with open(self.tmpin, 'rb') as f:
uu.decode(f)
self.assertEqual(
stat.S_IMODE(os.stat(self.tmpout).st_mode),
expected_mode
)
if __name__=="__main__":
unittest.main()

199
Lib/uu.py vendored
View File

@@ -1,199 +0,0 @@
#! /usr/bin/env python3
# Copyright 1994 by Lance Ellinghouse
# Cathedral City, California Republic, United States of America.
# All Rights Reserved
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appear in all copies and that
# both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of Lance Ellinghouse
# not be used in advertising or publicity pertaining to distribution
# of the software without specific, written prior permission.
# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE
# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Modified by Jack Jansen, CWI, July 1995:
# - Use binascii module to do the actual line-by-line conversion
# between ascii and binary. This results in a 1000-fold speedup. The C
# version is still 5 times faster, though.
# - Arguments more compliant with python standard
"""Implementation of the UUencode and UUdecode functions.
encode(in_file, out_file [,name, mode])
decode(in_file [, out_file, mode])
"""
import binascii
import os
import sys
__all__ = ["Error", "encode", "decode"]
class Error(Exception):
pass
def encode(in_file, out_file, name=None, mode=None):
"""Uuencode file"""
#
# If in_file is a pathname open it and change defaults
#
opened_files = []
try:
if in_file == '-':
in_file = sys.stdin.buffer
elif isinstance(in_file, str):
if name is None:
name = os.path.basename(in_file)
if mode is None:
try:
mode = os.stat(in_file).st_mode
except AttributeError:
pass
in_file = open(in_file, 'rb')
opened_files.append(in_file)
#
# Open out_file if it is a pathname
#
if out_file == '-':
out_file = sys.stdout.buffer
elif isinstance(out_file, str):
out_file = open(out_file, 'wb')
opened_files.append(out_file)
#
# Set defaults for name and mode
#
if name is None:
name = '-'
if mode is None:
mode = 0o666
#
# Write the data
#
out_file.write(('begin %o %s\n' % ((mode & 0o777), name)).encode("ascii"))
data = in_file.read(45)
while len(data) > 0:
out_file.write(binascii.b2a_uu(data))
data = in_file.read(45)
out_file.write(b' \nend\n')
finally:
for f in opened_files:
f.close()
def decode(in_file, out_file=None, mode=None, quiet=False):
"""Decode uuencoded file"""
#
# Open the input file, if needed.
#
opened_files = []
if in_file == '-':
in_file = sys.stdin.buffer
elif isinstance(in_file, str):
in_file = open(in_file, 'rb')
opened_files.append(in_file)
try:
#
# Read until a begin is encountered or we've exhausted the file
#
while True:
hdr = in_file.readline()
if not hdr:
raise Error('No valid begin line found in input file')
if not hdr.startswith(b'begin'):
continue
hdrfields = hdr.split(b' ', 2)
if len(hdrfields) == 3 and hdrfields[0] == b'begin':
try:
int(hdrfields[1], 8)
break
except ValueError:
pass
if out_file is None:
# If the filename isn't ASCII, what's up with that?!?
out_file = hdrfields[2].rstrip(b' \t\r\n\f').decode("ascii")
if os.path.exists(out_file):
raise Error('Cannot overwrite existing file: %s' % out_file)
if mode is None:
mode = int(hdrfields[1], 8)
#
# Open the output file
#
if out_file == '-':
out_file = sys.stdout.buffer
elif isinstance(out_file, str):
fp = open(out_file, 'wb')
try:
os.path.chmod(out_file, mode)
except AttributeError:
pass
out_file = fp
opened_files.append(out_file)
#
# Main decoding loop
#
s = in_file.readline()
while s and s.strip(b' \t\r\n\f') != b'end':
try:
data = binascii.a2b_uu(s)
except binascii.Error as v:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
if not quiet:
sys.stderr.write("Warning: %s\n" % v)
out_file.write(data)
s = in_file.readline()
if not s:
raise Error('Truncated input file')
finally:
for f in opened_files:
f.close()
def test():
"""uuencode/uudecode main program"""
import optparse
parser = optparse.OptionParser(usage='usage: %prog [-d] [-t] [input [output]]')
parser.add_option('-d', '--decode', dest='decode', help='Decode (instead of encode)?', default=False, action='store_true')
parser.add_option('-t', '--text', dest='text', help='data is text, encoded format unix-compatible text?', default=False, action='store_true')
(options, args) = parser.parse_args()
if len(args) > 2:
parser.error('incorrect number of arguments')
sys.exit(1)
# Use the binary streams underlying stdin/stdout
input = sys.stdin.buffer
output = sys.stdout.buffer
if len(args) > 0:
input = args[0]
if len(args) > 1:
output = args[1]
if options.decode:
if options.text:
if isinstance(output, str):
output = open(output, 'wb')
else:
print(sys.argv[0], ': cannot do -t to stdout')
sys.exit(1)
decode(input, output)
else:
if options.text:
if isinstance(input, str):
input = open(input, 'rb')
else:
print(sys.argv[0], ': cannot do -t from stdin')
sys.exit(1)
encode(input, output)
if __name__ == '__main__':
test()

View File

@@ -81,7 +81,7 @@ if sys.platform.startswith("win"):
0x00000100 | 0x00000001 | 0x00000020 | 0x00002000 | 0x00000010 | 0x00008000 | 0x00020000 0x00000100 | 0x00000001 | 0x00000020 | 0x00002000 | 0x00000010 | 0x00008000 | 0x00020000
# We really can't test if the results are correct, so it just checks for meaningful value # We really can't test if the results are correct, so it just checks for meaningful value
assert winver.major > 0 assert winver.major > 6
assert winver.minor >= 0 assert winver.minor >= 0
assert winver.build > 0 assert winver.build > 0
assert winver.platform == 2 assert winver.platform == 2
@@ -91,8 +91,8 @@ if sys.platform.startswith("win"):
# XXX if platform_version is implemented correctly, this'll break on compatiblity mode or a build without manifest # XXX if platform_version is implemented correctly, this'll break on compatiblity mode or a build without manifest
# these fields can mismatch in CPython # these fields can mismatch in CPython
# assert winver.major == winver.platform_version[0] assert winver.major == winver.platform_version[0]
# assert winver.minor == winver.platform_version[1] assert winver.minor == winver.platform_version[1]
# assert winver.build == winver.platform_version[2] # assert winver.build == winver.platform_version[2]
# test int_max_str_digits getter and setter # test int_max_str_digits getter and setter

View File

@@ -31,10 +31,11 @@ mod _overlapped {
}, },
System::Threading::INFINITE, System::Threading::INFINITE,
}; };
#[pyattr(once)]
fn INVALID_HANDLE_VALUE(_vm: &VirtualMachine) -> isize { #[pyattr]
windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE as isize const INVALID_HANDLE_VALUE: isize =
} unsafe { std::mem::transmute(windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE) };
#[pyattr] #[pyattr]
const NULL: isize = 0; const NULL: isize = 0;
@@ -263,7 +264,7 @@ mod _overlapped {
type Args = (isize,); type Args = (isize,);
fn py_new(cls: PyTypeRef, (mut event,): Self::Args, vm: &VirtualMachine) -> PyResult { fn py_new(cls: PyTypeRef, (mut event,): Self::Args, vm: &VirtualMachine) -> PyResult {
if event == INVALID_HANDLE_VALUE(vm) { if event == INVALID_HANDLE_VALUE {
event = unsafe { event = unsafe {
windows_sys::Win32::System::Threading::CreateEventA( windows_sys::Win32::System::Threading::CreateEventA(
std::ptr::null(), std::ptr::null(),

View File

@@ -22,12 +22,23 @@ mod sys {
vm::{Settings, VirtualMachine}, vm::{Settings, VirtualMachine},
}; };
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
#[cfg(windows)]
use std::os::windows::ffi::OsStrExt;
use std::{ use std::{
env::{self, VarError}, env::{self, VarError},
path, path,
sync::atomic::Ordering, sync::atomic::Ordering,
}; };
#[cfg(windows)]
use windows_sys::Win32::{
Foundation::MAX_PATH,
Storage::FileSystem::{
GetFileVersionInfoSizeW, GetFileVersionInfoW, VS_FIXEDFILEINFO, VerQueryValueW,
},
System::LibraryLoader::{GetModuleFileNameW, GetModuleHandleW},
};
// not the same as CPython (e.g. rust's x86_x64-unknown-linux-gnu is just x86_64-linux-gnu) // not the same as CPython (e.g. rust's x86_x64-unknown-linux-gnu is just x86_64-linux-gnu)
// but hopefully that's just an implementation detail? TODO: copy CPython's multiarch exactly, // but hopefully that's just an implementation detail? TODO: copy CPython's multiarch exactly,
// https://github.com/python/cpython/blob/3.8/configure.ac#L725 // https://github.com/python/cpython/blob/3.8/configure.ac#L725
@@ -485,6 +496,78 @@ mod sys {
vm.trace_func.borrow().clone() vm.trace_func.borrow().clone()
} }
#[cfg(windows)]
fn get_kernel32_version() -> std::io::Result<(u32, u32, u32)> {
unsafe {
// Create a wide string for "kernel32.dll"
let module_name: Vec<u16> = std::ffi::OsStr::new("kernel32.dll")
.encode_wide()
.chain(Some(0))
.collect();
let h_kernel32 = GetModuleHandleW(module_name.as_ptr());
if h_kernel32.is_null() {
return Err(std::io::Error::last_os_error());
}
// Prepare a buffer for the module file path
let mut kernel32_path = [0u16; MAX_PATH as usize];
let len = GetModuleFileNameW(
h_kernel32,
kernel32_path.as_mut_ptr(),
kernel32_path.len() as u32,
);
if len == 0 {
return Err(std::io::Error::last_os_error());
}
// Get the size of the version information block
let verblock_size =
GetFileVersionInfoSizeW(kernel32_path.as_ptr(), std::ptr::null_mut());
if verblock_size == 0 {
return Err(std::io::Error::last_os_error());
}
// Allocate a buffer to hold the version information
let mut verblock = vec![0u8; verblock_size as usize];
if GetFileVersionInfoW(
kernel32_path.as_ptr(),
0,
verblock_size,
verblock.as_mut_ptr() as *mut _,
) == 0
{
return Err(std::io::Error::last_os_error());
}
// Prepare an empty sub-block string (L"") as required by VerQueryValueW
let sub_block: Vec<u16> = std::ffi::OsStr::new("")
.encode_wide()
.chain(Some(0))
.collect();
let mut ffi_ptr: *mut VS_FIXEDFILEINFO = std::ptr::null_mut();
let mut ffi_len: u32 = 0;
if VerQueryValueW(
verblock.as_ptr() as *const _,
sub_block.as_ptr(),
&mut ffi_ptr as *mut *mut VS_FIXEDFILEINFO as *mut *mut _,
&mut ffi_len as *mut u32,
) == 0
|| ffi_ptr.is_null()
{
return Err(std::io::Error::last_os_error());
}
// Extract the version numbers from the VS_FIXEDFILEINFO structure.
let ffi = *ffi_ptr;
let real_major = (ffi.dwProductVersionMS >> 16) & 0xFFFF;
let real_minor = ffi.dwProductVersionMS & 0xFFFF;
let real_build = (ffi.dwProductVersionLS >> 16) & 0xFFFF;
Ok((real_major, real_minor, real_build))
}
}
#[cfg(windows)] #[cfg(windows)]
#[pyfunction] #[pyfunction]
fn getwindowsversion(vm: &VirtualMachine) -> PyResult<crate::builtins::tuple::PyTupleRef> { fn getwindowsversion(vm: &VirtualMachine) -> PyResult<crate::builtins::tuple::PyTupleRef> {
@@ -519,21 +602,18 @@ mod sys {
sp.into_string() sp.into_string()
.map_err(|_| vm.new_os_error("service pack is not ASCII".to_owned()))? .map_err(|_| vm.new_os_error("service pack is not ASCII".to_owned()))?
}; };
let real_version = get_kernel32_version().map_err(|e| vm.new_os_error(e.to_string()))?;
Ok(WindowsVersion { Ok(WindowsVersion {
major: version.dwMajorVersion, major: real_version.0,
minor: version.dwMinorVersion, minor: real_version.1,
build: version.dwBuildNumber, build: real_version.2,
platform: version.dwPlatformId, platform: version.dwPlatformId,
service_pack, service_pack,
service_pack_major: version.wServicePackMajor, service_pack_major: version.wServicePackMajor,
service_pack_minor: version.wServicePackMinor, service_pack_minor: version.wServicePackMinor,
suite_mask: version.wSuiteMask, suite_mask: version.wSuiteMask,
product_type: version.wProductType, product_type: version.wProductType,
platform_version: ( platform_version: (real_version.0, real_version.1, real_version.2), // TODO Provide accurate version, like CPython impl
version.dwMajorVersion,
version.dwMinorVersion,
version.dwBuildNumber,
), // TODO Provide accurate version, like CPython impl
} }
.into_struct_sequence(vm)) .into_struct_sequence(vm))
} }