mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
Compare commits
18 Commits
2025-03-10
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| f1863cc40f | |||
| 5f4ab448cb | |||
| 8a646957eb | |||
| 9aa703e07d | |||
| 287cfea18a | |||
|
|
bd94d8d50c | ||
|
|
7fab64ed9c | ||
|
|
8e22c399df | ||
|
|
7546ea91a9 | ||
|
|
8da66978bf | ||
|
|
8484bfa2e0 | ||
|
|
ff970b0e1c | ||
|
|
8be7e4327d | ||
|
|
82eeb237dc | ||
|
|
cbbadf562f | ||
|
|
4308321f39 | ||
|
|
985eebf9b0 | ||
|
|
87fae150da |
304
.github/scripts/code_review.py
vendored
Normal file
304
.github/scripts/code_review.py
vendored
Normal file
@@ -0,0 +1,304 @@
|
||||
"""Code Reviewer for Gitea."""
|
||||
|
||||
import asyncio
|
||||
import fnmatch
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
import aiohttp
|
||||
from model import Model
|
||||
|
||||
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "")
|
||||
HEADERS = {"Authorization": f"token {ACCESS_TOKEN}"}
|
||||
|
||||
GITHUB_EVENT_PATH = os.getenv("GITHUB_EVENT_PATH")
|
||||
try:
|
||||
with open(GITHUB_EVENT_PATH, "r") as f:
|
||||
EVENT_DATA = json.load(f)
|
||||
except FileNotFoundError:
|
||||
print("Failed to load event data.")
|
||||
exit(1)
|
||||
|
||||
FULL_CONTEXT_MODEL_NAME = os.getenv("FULL_CONTEXT_MODEL", "")
|
||||
SINGLE_CHUNK_MODEL_NAME = os.getenv("SINGLE_CHUNK_MODEL", "")
|
||||
FULL_CONTEXT_API_KEY = os.getenv("FULL_CONTEXT_API_KEY", "")
|
||||
SINGLE_CHUNK_API_KEY = os.getenv("SINGLE_CHUNK_API_KEY", "")
|
||||
|
||||
EXCLUDE_PATTERNS = os.getenv("EXCLUDE", "").split(",")
|
||||
|
||||
|
||||
def get_diff() -> str | None:
|
||||
"""Get code difference between base and head from Gitea.
|
||||
|
||||
Returns:
|
||||
str | None: code difference between base and head, or None if failed to get diff
|
||||
"""
|
||||
url = EVENT_DATA["pull_request"]["diff_url"]
|
||||
try:
|
||||
response = requests.get(url, headers=HEADERS)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except requests.RequestException as e:
|
||||
print(f"Failed to get diff: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def parse_diff(diff: str) -> list[dict[str, Any]]:
|
||||
"""Parse diff into list of dicts.
|
||||
|
||||
Args:
|
||||
diff: str, code difference between base and head
|
||||
|
||||
Returns:
|
||||
list[dict[str, Any]]: list of dicts, each dict represents a code chunks
|
||||
"""
|
||||
file_pattern = re.compile(
|
||||
r"(?s)diff --git a/(.+?) b/(.*?)\r?\n(.*?)(?=diff --git a/|$)", re.S
|
||||
)
|
||||
old_new_pattern = re.compile(r"(?m)^(---|\+\+\+)\s+(.*)$")
|
||||
chunk_range_pattern = re.compile(
|
||||
r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*?)?(?=@@|\Z)",
|
||||
re.MULTILINE | re.DOTALL,
|
||||
)
|
||||
list_diff = []
|
||||
for match in file_pattern.finditer(diff):
|
||||
diff_text = match.group(3)
|
||||
|
||||
old_new_match = list(old_new_pattern.finditer(diff_text))
|
||||
if len(old_new_match) != 2:
|
||||
continue
|
||||
|
||||
old_file = old_new_match[0].group(2)
|
||||
old_file = old_file.lstrip("a/") if old_file.startswith("a/") else old_file
|
||||
|
||||
new_file = old_new_match[1].group(2)
|
||||
if new_file == "/dev/null":
|
||||
print("Neglict deleted file")
|
||||
continue
|
||||
new_file = new_file.lstrip("b/")
|
||||
if any(fnmatch.fnmatch(new_file, pattern) for pattern in EXCLUDE_PATTERNS):
|
||||
print(f"Exclude file {new_file}")
|
||||
continue
|
||||
|
||||
output_diff_text = []
|
||||
for chunk_range_match in chunk_range_pattern.finditer(diff_text):
|
||||
old_idx = int(chunk_range_match.group(1))
|
||||
new_idx = int(chunk_range_match.group(3))
|
||||
for line in chunk_range_match.group(5).splitlines():
|
||||
if line.startswith("-"):
|
||||
output_diff_text.append(f"{old_idx} None {line}")
|
||||
old_idx += 1
|
||||
elif line.startswith("+"):
|
||||
output_diff_text.append(f"None {new_idx} {line}")
|
||||
new_idx += 1
|
||||
else:
|
||||
output_diff_text.append(f"{old_idx} {new_idx} {line}")
|
||||
old_idx += 1
|
||||
new_idx += 1
|
||||
|
||||
output_diff_text = "\n".join(output_diff_text)
|
||||
list_diff.append(
|
||||
{
|
||||
"file": new_file,
|
||||
"chunk": output_diff_text,
|
||||
}
|
||||
)
|
||||
return list_diff
|
||||
|
||||
|
||||
def create_comment(
|
||||
file: str, ai_response: list[dict[str, Any]]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Create comments for single chunk review.
|
||||
|
||||
Args:
|
||||
file: str, file name
|
||||
ai_response: list[dict[str, Any]], AI response for single chunk review
|
||||
|
||||
Returns:
|
||||
list[dict[str, Any]]: comments for single chunk review
|
||||
"""
|
||||
comments = []
|
||||
for ai_response in ai_response:
|
||||
comments.append(
|
||||
{
|
||||
"body": f"[REVIEW] {ai_response['reviewComment']}",
|
||||
"path": file,
|
||||
"new_position": int(ai_response["lineNumber"]),
|
||||
}
|
||||
)
|
||||
return comments
|
||||
|
||||
|
||||
async def analyze_single_chunks(
|
||||
single_chunk_model: Model, parsed_diff: list[dict[str, Any]]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Analyze single chunks and create comments.
|
||||
|
||||
Args:
|
||||
single_chunk_model: AI Session for single chunk analysis
|
||||
parsed_diff: list[dict[str, Any]], parsed diff
|
||||
|
||||
Returns:
|
||||
list[dict[str, Any]]: comments for single chunk review
|
||||
"""
|
||||
|
||||
async def process_single_chunk(diff: dict[str, Any]):
|
||||
file = diff["file"]
|
||||
chunk = diff["chunk"]
|
||||
response = await single_chunk_model.get_response_single_chunk(
|
||||
file, title, description, chunk
|
||||
)
|
||||
response = response.strip("`").lstrip("json").strip() or "[]"
|
||||
|
||||
try:
|
||||
response_json = json.loads(response)
|
||||
return create_comment(file, response_json)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Failed to parse response: {response}")
|
||||
return []
|
||||
|
||||
title = EVENT_DATA["pull_request"]["title"]
|
||||
description = EVENT_DATA["pull_request"]["body"]
|
||||
tasks = [process_single_chunk(diff) for diff in parsed_diff]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# Flatten the list of comments
|
||||
comments = [comment for result in results for comment in result]
|
||||
return comments
|
||||
|
||||
|
||||
async def get_file_content(file: str) -> str | None:
|
||||
"""Get file content from Gitea.
|
||||
|
||||
Args:
|
||||
file: str, file name
|
||||
|
||||
Returns:
|
||||
str | None: file content, or None if failed to get file content
|
||||
"""
|
||||
repo_url = EVENT_DATA["pull_request"]["head"]["repo"]["url"]
|
||||
branch = EVENT_DATA["pull_request"]["head"]["ref"]
|
||||
|
||||
replaced_file = file.replace("/", "%2F")
|
||||
url = f"{repo_url}/raw/{branch}%2F{replaced_file}?ref={branch}"
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession(headers=HEADERS) as session:
|
||||
async with session.get(url) as response:
|
||||
response.raise_for_status()
|
||||
return await response.text()
|
||||
except aiohttp.ClientError as e: # More specific exception handling
|
||||
print(f"Network error fetching {file}: {e}")
|
||||
except asyncio.TimeoutError:
|
||||
print(f"Timeout fetching {file}")
|
||||
return None
|
||||
|
||||
|
||||
async def analyze_full_context(
|
||||
full_context_model: Model, parsed_diff: list[dict[str, Any]]
|
||||
) -> str:
|
||||
"""Analyze full context and create review.
|
||||
|
||||
Args:
|
||||
full_context_model: AI Session for full context analysis
|
||||
parsed_diff: list[dict[str, Any]], parsed diff
|
||||
|
||||
Returns:
|
||||
str: review for full context
|
||||
"""
|
||||
|
||||
async def get_file_data(diff: dict[str, Any]):
|
||||
file = diff["file"]
|
||||
chunk = diff["chunk"]
|
||||
content = get_file_content(file)
|
||||
if content is None:
|
||||
return None
|
||||
return f"File: {file}\n{content}\nDiff: {chunk}"
|
||||
|
||||
tasks = [get_file_data(diff) for diff in parsed_diff]
|
||||
file_contents_list = await asyncio.gather(*tasks)
|
||||
|
||||
file_contents = [item for item in file_contents_list if item is not None]
|
||||
|
||||
if not file_contents:
|
||||
return ""
|
||||
|
||||
title = EVENT_DATA["pull_request"]["title"]
|
||||
description = EVENT_DATA["pull_request"]["body"]
|
||||
response = await full_context_model.get_response_full_context(
|
||||
title, description, file_contents
|
||||
)
|
||||
response = response.strip("`").lstrip("markdown").strip()
|
||||
return response
|
||||
|
||||
|
||||
def post_review(
|
||||
full_context_review: str, single_chunk_comments: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Post review to Gitea.
|
||||
|
||||
Args:
|
||||
full_context_review: str, review for full context
|
||||
single_chunk_comments: list[dict[str, Any]], comments for single chunk review
|
||||
"""
|
||||
repo_url = EVENT_DATA["pull_request"]["head"]["repo"]["url"]
|
||||
pull_number = EVENT_DATA["number"]
|
||||
commit_id = EVENT_DATA["pull_request"]["head"]["sha"]
|
||||
url = f"{repo_url}/pulls/{pull_number}/reviews"
|
||||
data = {
|
||||
"body": full_context_review,
|
||||
"event": "COMMENT",
|
||||
"comments": single_chunk_comments,
|
||||
"commit_id": commit_id,
|
||||
}
|
||||
response = requests.post(url, headers=HEADERS, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Code Reviewer for Gitea: Asynchronous version."""
|
||||
if EVENT_DATA["action"] not in ["opened", "synchronized"]:
|
||||
print("Unsupported event.")
|
||||
return
|
||||
|
||||
diff = get_diff()
|
||||
if diff is None:
|
||||
return
|
||||
elif not diff:
|
||||
print("No diff found.")
|
||||
return
|
||||
|
||||
full_context_model = Model(
|
||||
model=FULL_CONTEXT_MODEL_NAME,
|
||||
api_key=FULL_CONTEXT_API_KEY,
|
||||
is_full_context=True,
|
||||
)
|
||||
single_chunk_model = Model(
|
||||
model=SINGLE_CHUNK_MODEL_NAME,
|
||||
api_key=SINGLE_CHUNK_API_KEY,
|
||||
is_full_context=False,
|
||||
)
|
||||
|
||||
parsed_diff = parse_diff(diff)
|
||||
comments_task = asyncio.create_task(
|
||||
analyze_single_chunks(single_chunk_model, parsed_diff)
|
||||
)
|
||||
|
||||
if EVENT_DATA["action"] == "opened":
|
||||
full_context_response_task = asyncio.create_task(
|
||||
analyze_full_context(full_context_model, parsed_diff)
|
||||
)
|
||||
full_context_response = await full_context_response_task
|
||||
else:
|
||||
full_context_response = ""
|
||||
|
||||
comments = await comments_task
|
||||
post_review(full_context_response, comments)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
261
.github/scripts/model.py
vendored
Normal file
261
.github/scripts/model.py
vendored
Normal file
@@ -0,0 +1,261 @@
|
||||
"""Model for code review."""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import google.generativeai as genai
|
||||
import typing_extensions as typing
|
||||
from anthropic import AsyncAnthropic
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
|
||||
class GoogleResponse(typing.TypedDict):
|
||||
"""The response from Google model."""
|
||||
|
||||
lineNumber: int
|
||||
reviewComment: str
|
||||
|
||||
|
||||
class ModelProvider(Enum):
|
||||
"""The model provider."""
|
||||
|
||||
OPENAI = "openai"
|
||||
ANTHROPIC = "anthropic"
|
||||
GOOGLE = "google"
|
||||
DEEPSEEK = "deepseek"
|
||||
|
||||
@classmethod
|
||||
def from_model(cls, model: str) -> "ModelProvider":
|
||||
"""Get the model provider from the model name.
|
||||
|
||||
Args:
|
||||
model (str): The model name.
|
||||
|
||||
Returns:
|
||||
ModelProvider: The model provider.
|
||||
"""
|
||||
for prefix, provider in PREFIX_TO_MODEL.items():
|
||||
if model.startswith(prefix):
|
||||
return provider
|
||||
raise ValueError(f"Unknown model: {model}")
|
||||
|
||||
|
||||
PREFIX_TO_MODEL = {
|
||||
"gpt": ModelProvider.OPENAI,
|
||||
"o1": ModelProvider.OPENAI,
|
||||
"o3": ModelProvider.OPENAI,
|
||||
"claude": ModelProvider.ANTHROPIC,
|
||||
"gemini": ModelProvider.GOOGLE,
|
||||
"deepseek": ModelProvider.DEEPSEEK,
|
||||
}
|
||||
|
||||
|
||||
class Model:
|
||||
"""The model class.
|
||||
|
||||
Attributes:
|
||||
model (str): The model name.
|
||||
api_key (str): The API key.
|
||||
system_prompt (str): The system prompt.
|
||||
max_tokens (int): The maximum tokens.
|
||||
"""
|
||||
|
||||
def __init__( # noqa: D107
|
||||
self,
|
||||
model: str,
|
||||
api_key: str,
|
||||
is_full_context: bool,
|
||||
max_tokens: int = 4196,
|
||||
):
|
||||
self.model = model
|
||||
self.system_prompt = (
|
||||
FULL_CONTEXT_SYSTEM_PROMPT
|
||||
if is_full_context
|
||||
else SINGLE_CHUNK_SYSTEM_PROMPT
|
||||
)
|
||||
self.max_tokens = max_tokens
|
||||
self.provider = ModelProvider.from_model(model)
|
||||
self.session = self.create_session(api_key)
|
||||
|
||||
def create_session(self, api_key: str) -> Any:
|
||||
"""Create a session for the model.
|
||||
|
||||
Args:
|
||||
api_key (str): The API key.
|
||||
|
||||
Returns:
|
||||
Any: The session.
|
||||
"""
|
||||
match self.provider:
|
||||
case ModelProvider.OPENAI:
|
||||
return AsyncOpenAI(api_key=api_key)
|
||||
case ModelProvider.ANTHROPIC:
|
||||
return AsyncAnthropic(api_key=api_key)
|
||||
case ModelProvider.GOOGLE:
|
||||
genai.configure(api_key=api_key)
|
||||
return genai.GenerativeModel(
|
||||
model_name=self.model, system_instruction=self.system_prompt
|
||||
)
|
||||
case ModelProvider.DEEPSEEK:
|
||||
return AsyncOpenAI(api_key=api_key, base_url="https://api.deepseek.com")
|
||||
|
||||
async def request(self, prompt: str) -> str:
|
||||
"""Request the model to generate a response.
|
||||
|
||||
Args:
|
||||
prompt (str): The prompt to generate a response for.
|
||||
|
||||
Returns:
|
||||
str: The generated response.
|
||||
"""
|
||||
match self.provider:
|
||||
case ModelProvider.OPENAI | ModelProvider.DEEPSEEK:
|
||||
response = await self.session.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{"role": "system", "content": self.system_prompt},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
temperature=0.2,
|
||||
max_tokens=self.max_tokens,
|
||||
top_p=1,
|
||||
frequency_penalty=0,
|
||||
presence_penalty=0,
|
||||
)
|
||||
return response.choices[0].message.content.strip()
|
||||
case ModelProvider.ANTHROPIC:
|
||||
response = await self.session.messages.create(
|
||||
model=self.model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
system=[
|
||||
{
|
||||
"type": "text",
|
||||
"text": self.system_prompt,
|
||||
"cache_control": {"type": "ephemeral"},
|
||||
}
|
||||
],
|
||||
temperature=0.2,
|
||||
max_tokens=self.max_tokens,
|
||||
)
|
||||
return response.content[0].text.strip()
|
||||
case ModelProvider.GOOGLE:
|
||||
response = await self.session.generate_content_async(
|
||||
prompt,
|
||||
generation_config=genai.GenerationConfig(
|
||||
response_mime_type="application/json",
|
||||
response_schema=list[GoogleResponse],
|
||||
),
|
||||
)
|
||||
return response.text.strip()
|
||||
|
||||
async def get_response_single_chunk(
|
||||
self, file: str, title: str, description: str, chunk: str
|
||||
) -> str:
|
||||
"""Get the response for a single chunk.
|
||||
|
||||
Args:
|
||||
file (str): The file name.
|
||||
title (str): The pull request title.
|
||||
description (str): The pull request description.
|
||||
chunk (str): The diff chunk.
|
||||
|
||||
Returns:
|
||||
str: The response.
|
||||
"""
|
||||
prompt = SINGLE_CHUNK_USER_PROMPT.format(file, title, description, chunk)
|
||||
return await self.request(prompt)
|
||||
|
||||
async def get_response_full_context(
|
||||
self, title: str, description: str, file_contents: list[str]
|
||||
) -> str:
|
||||
"""Get the response for full context.
|
||||
|
||||
Args:
|
||||
title (str): The pull request title.
|
||||
description (str): The pull request description.
|
||||
file_contents (list[str]): The file contents, diffs.
|
||||
|
||||
Returns:
|
||||
str: The response.
|
||||
"""
|
||||
try:
|
||||
prompt = FULL_CONTEXT_USER_PROMPT.format(
|
||||
title, description, "\n".join(file_contents)
|
||||
)
|
||||
return await self.request(prompt)
|
||||
except Exception as e:
|
||||
print(f"Error during full context response: {e}")
|
||||
print(prompt)
|
||||
return None
|
||||
|
||||
|
||||
SINGLE_CHUNK_SYSTEM_PROMPT = (
|
||||
"Your task is to review pull requests. Instructions:\n"
|
||||
"- Provide the response in the following JSON format: "
|
||||
"""[{{"lineNumber": int, "reviewComment": str}}] \n"""
|
||||
"- lineNumber is about the line number of the code that in new file. \n"
|
||||
"- lineNumber can be found at the front of each line. \n"
|
||||
"- At the first number is old line number, the second number is new line number. \n"
|
||||
"- If the line starts with `+`, it means the line is added. \n"
|
||||
"- If the line starts with `-`, it means the line is deleted. \n"
|
||||
"- Evaluate whether the code changes and additions are appropriate "
|
||||
"and if the new code structure is suitable. \n"
|
||||
"- Do not give positive comments or compliments. \n"
|
||||
"- Provide comments and suggestions ONLY if there is something to improve"
|
||||
"otherwise return an empty array. \n"
|
||||
"- Write the comment in GitHub Markdown format. \n"
|
||||
"- Use the given description only for the overall context "
|
||||
"and only comment the code. \n"
|
||||
"- Do not suggest type hint or naming convention. \n"
|
||||
"- IMPORTANT: NEVER suggest adding comments to the code. \n"
|
||||
)
|
||||
SINGLE_CHUNK_USER_PROMPT = (
|
||||
"Review the following code diff in the file "
|
||||
"{} and take the pull request title and description into account "
|
||||
"when writing the response. \n"
|
||||
"Pull request title: {} \n"
|
||||
"Pull request description: \n"
|
||||
"--- \n"
|
||||
"{} \n"
|
||||
"--- \n"
|
||||
"Git diff to review: \n"
|
||||
"```diff \n"
|
||||
"{} \n"
|
||||
"```"
|
||||
)
|
||||
|
||||
FULL_CONTEXT_SYSTEM_PROMPT = (
|
||||
"You are an experienced software engineer specializing in reviewing pull "
|
||||
"requests. Your task is to provide an overall code review summary for a PR. "
|
||||
"Focus on assessing the following aspects:\n"
|
||||
"1. **Code Structure & Architecture:** "
|
||||
"Evaluate whether the code is well-organized, modular, "
|
||||
"and adheres to clean code principles. Suggest improvements if needed.\n"
|
||||
"2. **Refactoring Opportunities:** "
|
||||
"Identify areas where the code can be optimized or simplified without changing "
|
||||
"its behavior.\n"
|
||||
"3. **Potential Future Problems:** "
|
||||
"Highlight possible scalability, maintainability, or dependency issues that might "
|
||||
"arise in the future based on the current implementation.\n"
|
||||
"Be constructive and clear in your feedback. Avoid commenting on trivial issues "
|
||||
"or syntax errors—focus on high-level feedback.\n"
|
||||
"Precise instructions:\n"
|
||||
"- Do not give positive comments or compliments.\n"
|
||||
"- Provide comments and suggestions ONLY if there is something to improve, "
|
||||
"otherwise return an empty string.\n"
|
||||
"- Write the comment in GitHub Markdown format.\n"
|
||||
"- Do not start with 'markdown' or '```markdown'.\n"
|
||||
"- IMPORTANT: Give example code block or pseudo code if you can.\n"
|
||||
)
|
||||
|
||||
FULL_CONTEXT_USER_PROMPT = (
|
||||
"Review the following code and take the pull request title "
|
||||
"and description into account when writing the response. \n"
|
||||
"Pull request title: {} \n"
|
||||
"Pull request description: \n"
|
||||
"--- \n"
|
||||
"{} \n"
|
||||
"--- \n"
|
||||
"Code to review: \n"
|
||||
"{}"
|
||||
)
|
||||
196
.github/workflows/ci.yaml
vendored
196
.github/workflows/ci.yaml
vendored
@@ -1,42 +1,14 @@
|
||||
on:
|
||||
push:
|
||||
branches: [main, release]
|
||||
pull_request:
|
||||
branches: [dev]
|
||||
types: [unlabeled, opened, synchronize, reopened]
|
||||
merge_group:
|
||||
workflow_dispatch:
|
||||
|
||||
name: CI
|
||||
|
||||
# Cancel previous workflows if they are the same workflow on same ref (branch/tags)
|
||||
# with the same event (push/pull_request) even they are in progress.
|
||||
# This setting will help reduce the number of duplicated workflows.
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
|
||||
cancel-in-progress: true
|
||||
name: Dev-CI
|
||||
|
||||
env:
|
||||
CARGO_ARGS: --no-default-features --features stdlib,importlib,encodings,sqlite,ssl
|
||||
# Skip additional tests on Windows. They are checked on Linux and MacOS.
|
||||
# test_glob: many failing tests
|
||||
# test_io: many failing tests
|
||||
# test_os: many failing tests
|
||||
# test_pathlib: support.rmtree() failing
|
||||
# test_posixpath: OSError: (22, 'The filename, directory name, or volume label syntax is incorrect. (os error 123)')
|
||||
# test_venv: couple of failing tests
|
||||
WINDOWS_SKIPS: >-
|
||||
test_glob
|
||||
test_io
|
||||
test_os
|
||||
test_rlcompleter
|
||||
test_pathlib
|
||||
test_posixpath
|
||||
test_venv
|
||||
# configparser: https://github.com/RustPython/RustPython/issues/4995#issuecomment-1582397417
|
||||
# socketserver: seems related to configparser crash.
|
||||
MACOS_SKIPS: >-
|
||||
test_configparser
|
||||
test_socketserver
|
||||
CARGO_ARGS: --no-default-features --features stdlib,zlib,importlib,encodings,sqlite,ssl
|
||||
# PLATFORM_INDEPENDENT_TESTS are tests that do not depend on the underlying OS. They are currently
|
||||
# only run on Linux to speed up the CI.
|
||||
PLATFORM_INDEPENDENT_TESTS: >-
|
||||
@@ -117,11 +89,7 @@ jobs:
|
||||
env:
|
||||
RUST_BACKTRACE: full
|
||||
name: Run rust tests
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [macos-latest, ubuntu-latest, windows-latest]
|
||||
fail-fast: false
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
@@ -129,26 +97,12 @@ jobs:
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- name: Set up the Windows environment
|
||||
shell: bash
|
||||
run: |
|
||||
cargo install --target-dir=target -v cargo-vcpkg
|
||||
cargo vcpkg -v build
|
||||
if: runner.os == 'Windows'
|
||||
- name: Set up the Mac environment
|
||||
run: brew install autoconf automake libtool
|
||||
if: runner.os == 'macOS'
|
||||
|
||||
- name: run clippy
|
||||
run: cargo clippy ${{ env.CARGO_ARGS }} --workspace --exclude rustpython_wasm -- -Dwarnings
|
||||
|
||||
- name: run rust tests
|
||||
run: cargo test --workspace --exclude rustpython_wasm --verbose --features threading ${{ env.CARGO_ARGS }}
|
||||
if: runner.os != 'macOS'
|
||||
- name: run rust tests
|
||||
run: cargo test --workspace --exclude rustpython_wasm --exclude rustpython-jit --verbose --features threading ${{ env.CARGO_ARGS }}
|
||||
if: runner.os == 'macOS'
|
||||
|
||||
|
||||
- name: check compilation without threading
|
||||
run: cargo check ${{ env.CARGO_ARGS }}
|
||||
|
||||
@@ -156,24 +110,6 @@ jobs:
|
||||
run:
|
||||
cargo run --manifest-path example_projects/barebone/Cargo.toml
|
||||
cargo run --manifest-path example_projects/frozen_stdlib/Cargo.toml
|
||||
if: runner.os == 'Linux'
|
||||
|
||||
- name: prepare AppleSilicon build
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
target: aarch64-apple-darwin
|
||||
if: runner.os == 'macOS'
|
||||
- name: Check compilation for Apple Silicon
|
||||
run: cargo check --target aarch64-apple-darwin
|
||||
if: runner.os == 'macOS'
|
||||
- name: prepare iOS build
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
target: aarch64-apple-ios
|
||||
if: runner.os == 'macOS'
|
||||
- name: Check compilation for iOS
|
||||
run: cargo check --target aarch64-apple-ios
|
||||
if: runner.os == 'macOS'
|
||||
|
||||
exotic_targets:
|
||||
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
|
||||
@@ -240,11 +176,7 @@ jobs:
|
||||
env:
|
||||
RUST_BACKTRACE: full
|
||||
name: Run snippets and cpython tests
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [macos-latest, ubuntu-latest, windows-latest]
|
||||
fail-fast: false
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
@@ -252,54 +184,29 @@ jobs:
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ env.PYTHON_VERSION }}
|
||||
- name: Set up the Windows environment
|
||||
shell: bash
|
||||
run: |
|
||||
cargo install cargo-vcpkg
|
||||
cargo vcpkg build
|
||||
if: runner.os == 'Windows'
|
||||
- name: Set up the Mac environment
|
||||
run: brew install autoconf automake libtool openssl@3
|
||||
if: runner.os == 'macOS'
|
||||
- name: build rustpython
|
||||
run: cargo build --release --verbose --features=threading ${{ env.CARGO_ARGS }}
|
||||
if: runner.os == 'macOS'
|
||||
- name: build rustpython
|
||||
run: cargo build --release --verbose --features=threading ${{ env.CARGO_ARGS }},jit
|
||||
if: runner.os != 'macOS'
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ env.PYTHON_VERSION }}
|
||||
- name: run snippets
|
||||
run: python -m pip install -r requirements.txt && pytest -v
|
||||
working-directory: ./extra_tests
|
||||
- if: runner.os == 'Linux'
|
||||
name: run cpython platform-independent tests
|
||||
- name: run cpython platform-independent tests
|
||||
run:
|
||||
target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v ${{ env.PLATFORM_INDEPENDENT_TESTS }}
|
||||
- if: runner.os == 'Linux'
|
||||
name: run cpython platform-dependent tests (Linux)
|
||||
- name: run cpython platform-dependent tests (Linux)
|
||||
run: target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }}
|
||||
- if: runner.os == 'macOS'
|
||||
name: run cpython platform-dependent tests (MacOS)
|
||||
run: target/release/rustpython -m test -j 1 --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} ${{ env.MACOS_SKIPS }}
|
||||
- if: runner.os == 'Windows'
|
||||
name: run cpython platform-dependent tests (windows partial - fixme)
|
||||
run:
|
||||
target/release/rustpython -m test -j 1 --slowest --fail-env-changed -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} ${{ env.WINDOWS_SKIPS }}
|
||||
- if: runner.os != 'Windows'
|
||||
name: check that --install-pip succeeds
|
||||
- name: check that --install-pip succeeds
|
||||
run: |
|
||||
mkdir site-packages
|
||||
target/release/rustpython --install-pip ensurepip --user
|
||||
target/release/rustpython -m pip install six
|
||||
- if: runner.os != 'Windows'
|
||||
name: Check that ensurepip succeeds.
|
||||
- name: Check that ensurepip succeeds.
|
||||
run: |
|
||||
target/release/rustpython -m ensurepip
|
||||
target/release/rustpython -c "import pip"
|
||||
- if: runner.os != 'Windows'
|
||||
name: Check if pip inside venv is functional
|
||||
- name: Check if pip inside venv is functional
|
||||
run: |
|
||||
target/release/rustpython -m venv testvenv
|
||||
testvenv/bin/rustpython -m pip install wheel
|
||||
@@ -348,84 +255,3 @@ jobs:
|
||||
# a memory leak, at least until we have proper cyclic gc
|
||||
run: MIRIFLAGS='-Zmiri-ignore-leaks' cargo +nightly miri test -p rustpython-vm -- miri_test
|
||||
|
||||
wasm:
|
||||
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
|
||||
name: Check the WASM package and demo
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: install wasm-pack
|
||||
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
|
||||
- name: install geckodriver
|
||||
run: |
|
||||
wget https://github.com/mozilla/geckodriver/releases/download/v0.36.0/geckodriver-v0.36.0-linux64.tar.gz
|
||||
mkdir geckodriver
|
||||
tar -xzf geckodriver-v0.36.0-linux64.tar.gz -C geckodriver
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ env.PYTHON_VERSION }}
|
||||
- run: python -m pip install -r requirements.txt
|
||||
working-directory: ./wasm/tests
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
cache: "npm"
|
||||
cache-dependency-path: "wasm/demo/package-lock.json"
|
||||
- name: run test
|
||||
run: |
|
||||
export PATH=$PATH:`pwd`/../../geckodriver
|
||||
npm install
|
||||
npm run test
|
||||
env:
|
||||
NODE_OPTIONS: "--openssl-legacy-provider"
|
||||
working-directory: ./wasm/demo
|
||||
- uses: mwilliamson/setup-wabt-action@v3
|
||||
with: { wabt-version: "1.0.36" }
|
||||
- name: check wasm32-unknown without js
|
||||
run: |
|
||||
cd wasm/wasm-unknown-test
|
||||
cargo build --release --verbose
|
||||
if wasm-objdump -xj Import target/wasm32-unknown-unknown/release/wasm_unknown_test.wasm; then
|
||||
echo "ERROR: wasm32-unknown module expects imports from the host environment" >2
|
||||
fi
|
||||
- name: build notebook demo
|
||||
if: github.ref == 'refs/heads/release'
|
||||
run: |
|
||||
npm install
|
||||
npm run dist
|
||||
mv dist ../demo/dist/notebook
|
||||
env:
|
||||
NODE_OPTIONS: "--openssl-legacy-provider"
|
||||
working-directory: ./wasm/notebook
|
||||
- name: Deploy demo to Github Pages
|
||||
if: success() && github.ref == 'refs/heads/release'
|
||||
uses: peaceiris/actions-gh-pages@v4
|
||||
env:
|
||||
ACTIONS_DEPLOY_KEY: ${{ secrets.ACTIONS_DEMO_DEPLOY_KEY }}
|
||||
PUBLISH_DIR: ./wasm/demo/dist
|
||||
EXTERNAL_REPOSITORY: RustPython/demo
|
||||
PUBLISH_BRANCH: master
|
||||
|
||||
wasm-wasi:
|
||||
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip:ci') }}
|
||||
name: Run snippets and cpython tests on wasm-wasi
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
target: wasm32-wasip1
|
||||
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Setup Wasmer
|
||||
uses: wasmerio/setup-wasmer@v3
|
||||
- name: Install clang
|
||||
run: sudo apt-get update && sudo apt-get install clang -y
|
||||
- name: build rustpython
|
||||
run: cargo build --release --target wasm32-wasip1 --features freeze-stdlib,stdlib --verbose
|
||||
- name: run snippets
|
||||
run: wasmer run --dir `pwd` target/wasm32-wasip1/release/rustpython.wasm -- `pwd`/extra_tests/snippets/stdlib_random.py
|
||||
- name: run cpython unittest
|
||||
run: wasmer run --dir `pwd` target/wasm32-wasip1/release/rustpython.wasm -- `pwd`/Lib/test/test_int.py
|
||||
|
||||
36
.github/workflows/code-review.yml
vendored
Normal file
36
.github/workflows/code-review.yml
vendored
Normal file
@@ -0,0 +1,36 @@
|
||||
name: Code Review
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [dev]
|
||||
types: [opened, synchronize]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
review:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install aiohttp requests py-gitea openai anthropic google-generativeai
|
||||
|
||||
- name: Run Code Review
|
||||
env:
|
||||
ACCESS_TOKEN: ${{ secrets.ACCESS_TOKEN }}
|
||||
FULL_CONTEXT_MODEL: o3-mini
|
||||
FULL_CONTEXT_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
SINGLE_CHUNK_MODEL: gemini-2.0-flash-exp
|
||||
SINGLE_CHUNK_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
|
||||
EXCLUDE: "*.yml,*.yaml"
|
||||
run: python .gitea/scripts/code_review.py
|
||||
|
||||
2
.github/workflows/cron-ci.yaml
vendored
2
.github/workflows/cron-ci.yaml
vendored
@@ -101,7 +101,7 @@ jobs:
|
||||
[ -f ./_data/whats_left.temp ] && cp ./_data/whats_left.temp ./_data/whats_left_lastrun.temp
|
||||
cp ../whats_left.temp ./_data/whats_left.temp
|
||||
rm ./_data/whats_left/modules.csv
|
||||
echo -e "modules" > ./_data/whats_left/modules.csv
|
||||
echo -e "module" > ./_data/whats_left/modules.csv
|
||||
cat ./_data/whats_left.temp | grep "(entire module)" | cut -d ' ' -f 1 | sort >> ./_data/whats_left/modules.csv
|
||||
git add -A
|
||||
if git -c user.name="Github Actions" -c user.email="actions@github.com" commit -m "Update what is left results" --author="$GITHUB_ACTOR"; then
|
||||
|
||||
5
Lib/_osx_support.py
vendored
5
Lib/_osx_support.py
vendored
@@ -507,6 +507,11 @@ def get_platform_osx(_config_vars, osname, release, machine):
|
||||
# MACOSX_DEPLOYMENT_TARGET.
|
||||
|
||||
macver = _config_vars.get('MACOSX_DEPLOYMENT_TARGET', '')
|
||||
if macver and '.' not in macver:
|
||||
# Ensure that the version includes at least a major
|
||||
# and minor version, even if MACOSX_DEPLOYMENT_TARGET
|
||||
# is set to a single-label version like "14".
|
||||
macver += '.0'
|
||||
macrelease = _get_system_version() or macver
|
||||
macver = macver or macrelease
|
||||
|
||||
|
||||
1012
Lib/cgi.py
vendored
1012
Lib/cgi.py
vendored
File diff suppressed because it is too large
Load Diff
39
Lib/email/message.py
vendored
39
Lib/email/message.py
vendored
@@ -7,7 +7,6 @@
|
||||
__all__ = ['Message', 'EmailMessage']
|
||||
|
||||
import re
|
||||
import uu
|
||||
import quopri
|
||||
from io import BytesIO, StringIO
|
||||
|
||||
@@ -101,6 +100,35 @@ def _unquotevalue(value):
|
||||
return utils.unquote(value)
|
||||
|
||||
|
||||
def _decode_uu(encoded):
|
||||
"""Decode uuencoded data."""
|
||||
decoded_lines = []
|
||||
encoded_lines_iter = iter(encoded.splitlines())
|
||||
for line in encoded_lines_iter:
|
||||
if line.startswith(b"begin "):
|
||||
mode, _, path = line.removeprefix(b"begin ").partition(b" ")
|
||||
try:
|
||||
int(mode, base=8)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
else:
|
||||
raise ValueError("`begin` line not found")
|
||||
for line in encoded_lines_iter:
|
||||
if not line:
|
||||
raise ValueError("Truncated input")
|
||||
elif line.strip(b' \t\r\n\f') == b'end':
|
||||
break
|
||||
try:
|
||||
decoded_line = binascii.a2b_uu(line)
|
||||
except binascii.Error:
|
||||
# Workaround for broken uuencoders by /Fredrik Lundh
|
||||
nbytes = (((line[0]-32) & 63) * 4 + 5) // 3
|
||||
decoded_line = binascii.a2b_uu(line[:nbytes])
|
||||
decoded_lines.append(decoded_line)
|
||||
|
||||
return b''.join(decoded_lines)
|
||||
|
||||
class Message:
|
||||
"""Basic message object.
|
||||
@@ -288,13 +316,10 @@ class Message:
|
||||
self.policy.handle_defect(self, defect)
|
||||
return value
|
||||
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
|
||||
in_file = BytesIO(bpayload)
|
||||
out_file = BytesIO()
|
||||
try:
|
||||
uu.decode(in_file, out_file, quiet=True)
|
||||
return out_file.getvalue()
|
||||
except uu.Error:
|
||||
# Some decoding problem
|
||||
return _decode_uu(bpayload)
|
||||
except ValueError:
|
||||
# Some decoding problem.
|
||||
return bpayload
|
||||
if isinstance(payload, str):
|
||||
return bpayload
|
||||
|
||||
645
Lib/test/test_cgi.py
vendored
645
Lib/test/test_cgi.py
vendored
@@ -1,645 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from collections import namedtuple
|
||||
from io import StringIO, BytesIO
|
||||
from test import support
|
||||
from test.support import warnings_helper
|
||||
|
||||
cgi = warnings_helper.import_deprecated("cgi")
|
||||
|
||||
|
||||
class HackedSysModule:
|
||||
# The regression test will have real values in sys.argv, which
|
||||
# will completely confuse the test of the cgi module
|
||||
argv = []
|
||||
stdin = sys.stdin
|
||||
|
||||
cgi.sys = HackedSysModule()
|
||||
|
||||
class ComparableException:
|
||||
def __init__(self, err):
|
||||
self.err = err
|
||||
|
||||
def __str__(self):
|
||||
return str(self.err)
|
||||
|
||||
def __eq__(self, anExc):
|
||||
if not isinstance(anExc, Exception):
|
||||
return NotImplemented
|
||||
return (self.err.__class__ == anExc.__class__ and
|
||||
self.err.args == anExc.args)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.err, attr)
|
||||
|
||||
def do_test(buf, method):
|
||||
env = {}
|
||||
if method == "GET":
|
||||
fp = None
|
||||
env['REQUEST_METHOD'] = 'GET'
|
||||
env['QUERY_STRING'] = buf
|
||||
elif method == "POST":
|
||||
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
|
||||
env['REQUEST_METHOD'] = 'POST'
|
||||
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
|
||||
env['CONTENT_LENGTH'] = str(len(buf))
|
||||
else:
|
||||
raise ValueError("unknown method: %s" % method)
|
||||
try:
|
||||
return cgi.parse(fp, env, strict_parsing=1)
|
||||
except Exception as err:
|
||||
return ComparableException(err)
|
||||
|
||||
parse_strict_test_cases = [
|
||||
("", {}),
|
||||
("&", ValueError("bad query field: ''")),
|
||||
("&&", ValueError("bad query field: ''")),
|
||||
# Should the next few really be valid?
|
||||
("=", {}),
|
||||
("=&=", {}),
|
||||
# This rest seem to make sense
|
||||
("=a", {'': ['a']}),
|
||||
("&=a", ValueError("bad query field: ''")),
|
||||
("=a&", ValueError("bad query field: ''")),
|
||||
("=&a", ValueError("bad query field: 'a'")),
|
||||
("b=a", {'b': ['a']}),
|
||||
("b+=a", {'b ': ['a']}),
|
||||
("a=b=a", {'a': ['b=a']}),
|
||||
("a=+b=a", {'a': [' b=a']}),
|
||||
("&b=a", ValueError("bad query field: ''")),
|
||||
("b&=a", ValueError("bad query field: 'b'")),
|
||||
("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
|
||||
("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
|
||||
{'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
|
||||
'cuyer': ['r'],
|
||||
'expire': ['964546263'],
|
||||
'kid': ['130003.300038'],
|
||||
'lobale': ['en-US'],
|
||||
'order_id': ['0bb2e248638833d48cb7fed300000f1b'],
|
||||
'ss': ['env'],
|
||||
'view': ['bustomer'],
|
||||
}),
|
||||
|
||||
("group_id=5470&set=custom&_assigned_to=31392&_status=1&_category=100&SUBMIT=Browse",
|
||||
{'SUBMIT': ['Browse'],
|
||||
'_assigned_to': ['31392'],
|
||||
'_category': ['100'],
|
||||
'_status': ['1'],
|
||||
'group_id': ['5470'],
|
||||
'set': ['custom'],
|
||||
})
|
||||
]
|
||||
|
||||
def norm(seq):
|
||||
return sorted(seq, key=repr)
|
||||
|
||||
def first_elts(list):
|
||||
return [p[0] for p in list]
|
||||
|
||||
def first_second_elts(list):
|
||||
return [(p[0], p[1][0]) for p in list]
|
||||
|
||||
def gen_result(data, environ):
|
||||
encoding = 'latin-1'
|
||||
fake_stdin = BytesIO(data.encode(encoding))
|
||||
fake_stdin.seek(0)
|
||||
form = cgi.FieldStorage(fp=fake_stdin, environ=environ, encoding=encoding)
|
||||
|
||||
result = {}
|
||||
for k, v in dict(form).items():
|
||||
result[k] = isinstance(v, list) and form.getlist(k) or v.value
|
||||
|
||||
return result
|
||||
|
||||
class CgiTests(unittest.TestCase):
|
||||
|
||||
def test_parse_multipart(self):
|
||||
fp = BytesIO(POSTDATA.encode('latin1'))
|
||||
env = {'boundary': BOUNDARY.encode('latin1'),
|
||||
'CONTENT-LENGTH': '558'}
|
||||
result = cgi.parse_multipart(fp, env)
|
||||
expected = {'submit': [' Add '], 'id': ['1234'],
|
||||
'file': [b'Testing 123.\n'], 'title': ['']}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_parse_multipart_without_content_length(self):
|
||||
POSTDATA = '''--JfISa01
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
|
||||
just a string
|
||||
|
||||
--JfISa01--
|
||||
'''
|
||||
fp = BytesIO(POSTDATA.encode('latin1'))
|
||||
env = {'boundary': 'JfISa01'.encode('latin1')}
|
||||
result = cgi.parse_multipart(fp, env)
|
||||
expected = {'submit-name': ['just a string\n']}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# TODO RUSTPYTHON - see https://github.com/RustPython/RustPython/issues/935
|
||||
@unittest.expectedFailure
|
||||
def test_parse_multipart_invalid_encoding(self):
|
||||
BOUNDARY = "JfISa01"
|
||||
POSTDATA = """--JfISa01
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
Content-Length: 3
|
||||
|
||||
\u2603
|
||||
--JfISa01"""
|
||||
fp = BytesIO(POSTDATA.encode('utf8'))
|
||||
env = {'boundary': BOUNDARY.encode('latin1'),
|
||||
'CONTENT-LENGTH': str(len(POSTDATA.encode('utf8')))}
|
||||
result = cgi.parse_multipart(fp, env, encoding="ascii",
|
||||
errors="surrogateescape")
|
||||
expected = {'submit-name': ["\udce2\udc98\udc83"]}
|
||||
self.assertEqual(result, expected)
|
||||
self.assertEqual("\u2603".encode('utf8'),
|
||||
result["submit-name"][0].encode('utf8', 'surrogateescape'))
|
||||
|
||||
def test_fieldstorage_properties(self):
|
||||
fs = cgi.FieldStorage()
|
||||
self.assertFalse(fs)
|
||||
self.assertIn("FieldStorage", repr(fs))
|
||||
self.assertEqual(list(fs), list(fs.keys()))
|
||||
fs.list.append(namedtuple('MockFieldStorage', 'name')('fieldvalue'))
|
||||
self.assertTrue(fs)
|
||||
|
||||
def test_fieldstorage_invalid(self):
|
||||
self.assertRaises(TypeError, cgi.FieldStorage, "not-a-file-obj",
|
||||
environ={"REQUEST_METHOD":"PUT"})
|
||||
self.assertRaises(TypeError, cgi.FieldStorage, "foo", "bar")
|
||||
fs = cgi.FieldStorage(headers={'content-type':'text/plain'})
|
||||
self.assertRaises(TypeError, bool, fs)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_strict(self):
|
||||
for orig, expect in parse_strict_test_cases:
|
||||
# Test basic parsing
|
||||
d = do_test(orig, "GET")
|
||||
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
|
||||
d = do_test(orig, "POST")
|
||||
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
|
||||
|
||||
env = {'QUERY_STRING': orig}
|
||||
fs = cgi.FieldStorage(environ=env)
|
||||
if isinstance(expect, dict):
|
||||
# test dict interface
|
||||
self.assertEqual(len(expect), len(fs))
|
||||
self.assertCountEqual(expect.keys(), fs.keys())
|
||||
##self.assertEqual(norm(expect.values()), norm(fs.values()))
|
||||
##self.assertEqual(norm(expect.items()), norm(fs.items()))
|
||||
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
|
||||
# test individual fields
|
||||
for key in expect.keys():
|
||||
expect_val = expect[key]
|
||||
self.assertIn(key, fs)
|
||||
if len(expect_val) > 1:
|
||||
self.assertEqual(fs.getvalue(key), expect_val)
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
def test_separator(self):
|
||||
parse_semicolon = [
|
||||
("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
|
||||
("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
(";", ValueError("bad query field: ''")),
|
||||
(";;", ValueError("bad query field: ''")),
|
||||
("=;a", ValueError("bad query field: 'a'")),
|
||||
(";b=a", ValueError("bad query field: ''")),
|
||||
("b;=a", ValueError("bad query field: 'b'")),
|
||||
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
|
||||
]
|
||||
for orig, expect in parse_semicolon:
|
||||
env = {'QUERY_STRING': orig}
|
||||
fs = cgi.FieldStorage(separator=';', environ=env)
|
||||
if isinstance(expect, dict):
|
||||
for key in expect.keys():
|
||||
expect_val = expect[key]
|
||||
self.assertIn(key, fs)
|
||||
if len(expect_val) > 1:
|
||||
self.assertEqual(fs.getvalue(key), expect_val)
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||
def test_log(self):
|
||||
cgi.log("Testing")
|
||||
|
||||
cgi.logfp = StringIO()
|
||||
cgi.initlog("%s", "Testing initlog 1")
|
||||
cgi.log("%s", "Testing log 2")
|
||||
self.assertEqual(cgi.logfp.getvalue(), "Testing initlog 1\nTesting log 2\n")
|
||||
if os.path.exists(os.devnull):
|
||||
cgi.logfp = None
|
||||
cgi.logfile = os.devnull
|
||||
cgi.initlog("%s", "Testing log 3")
|
||||
self.addCleanup(cgi.closelog)
|
||||
cgi.log("Testing log 4")
|
||||
|
||||
def test_fieldstorage_readline(self):
|
||||
# FieldStorage uses readline, which has the capacity to read all
|
||||
# contents of the input file into memory; we use readline's size argument
|
||||
# to prevent that for files that do not contain any newlines in
|
||||
# non-GET/HEAD requests
|
||||
class TestReadlineFile:
|
||||
def __init__(self, file):
|
||||
self.file = file
|
||||
self.numcalls = 0
|
||||
|
||||
def readline(self, size=None):
|
||||
self.numcalls += 1
|
||||
if size:
|
||||
return self.file.readline(size)
|
||||
else:
|
||||
return self.file.readline()
|
||||
|
||||
def __getattr__(self, name):
|
||||
file = self.__dict__['file']
|
||||
a = getattr(file, name)
|
||||
if not isinstance(a, int):
|
||||
setattr(self, name, a)
|
||||
return a
|
||||
|
||||
f = TestReadlineFile(tempfile.TemporaryFile("wb+"))
|
||||
self.addCleanup(f.close)
|
||||
f.write(b'x' * 256 * 1024)
|
||||
f.seek(0)
|
||||
env = {'REQUEST_METHOD':'PUT'}
|
||||
fs = cgi.FieldStorage(fp=f, environ=env)
|
||||
self.addCleanup(fs.file.close)
|
||||
# if we're not chunking properly, readline is only called twice
|
||||
# (by read_binary); if we are chunking properly, it will be called 5 times
|
||||
# as long as the chunksize is 1 << 16.
|
||||
self.assertGreater(f.numcalls, 2)
|
||||
f.close()
|
||||
|
||||
def test_fieldstorage_multipart(self):
|
||||
#Test basic FieldStorage multipart parsing
|
||||
env = {
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
|
||||
'CONTENT_LENGTH': '558'}
|
||||
fp = BytesIO(POSTDATA.encode('latin-1'))
|
||||
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
|
||||
self.assertEqual(len(fs.list), 4)
|
||||
expect = [{'name':'id', 'filename':None, 'value':'1234'},
|
||||
{'name':'title', 'filename':None, 'value':''},
|
||||
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
|
||||
{'name':'submit', 'filename':None, 'value':' Add '}]
|
||||
for x in range(len(fs.list)):
|
||||
for k, exp in expect[x].items():
|
||||
got = getattr(fs.list[x], k)
|
||||
self.assertEqual(got, exp)
|
||||
|
||||
def test_fieldstorage_multipart_leading_whitespace(self):
|
||||
env = {
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
|
||||
'CONTENT_LENGTH': '560'}
|
||||
# Add some leading whitespace to our post data that will cause the
|
||||
# first line to not be the innerboundary.
|
||||
fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
|
||||
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
|
||||
self.assertEqual(len(fs.list), 4)
|
||||
expect = [{'name':'id', 'filename':None, 'value':'1234'},
|
||||
{'name':'title', 'filename':None, 'value':''},
|
||||
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
|
||||
{'name':'submit', 'filename':None, 'value':' Add '}]
|
||||
for x in range(len(fs.list)):
|
||||
for k, exp in expect[x].items():
|
||||
got = getattr(fs.list[x], k)
|
||||
self.assertEqual(got, exp)
|
||||
|
||||
def test_fieldstorage_multipart_non_ascii(self):
|
||||
#Test basic FieldStorage multipart parsing
|
||||
env = {'REQUEST_METHOD':'POST',
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
|
||||
'CONTENT_LENGTH':'558'}
|
||||
for encoding in ['iso-8859-1','utf-8']:
|
||||
fp = BytesIO(POSTDATA_NON_ASCII.encode(encoding))
|
||||
fs = cgi.FieldStorage(fp, environ=env,encoding=encoding)
|
||||
self.assertEqual(len(fs.list), 1)
|
||||
expect = [{'name':'id', 'filename':None, 'value':'\xe7\xf1\x80'}]
|
||||
for x in range(len(fs.list)):
|
||||
for k, exp in expect[x].items():
|
||||
got = getattr(fs.list[x], k)
|
||||
self.assertEqual(got, exp)
|
||||
|
||||
def test_fieldstorage_multipart_maxline(self):
|
||||
# Issue #18167
|
||||
maxline = 1 << 16
|
||||
self.maxDiff = None
|
||||
def check(content):
|
||||
data = """---123
|
||||
Content-Disposition: form-data; name="upload"; filename="fake.txt"
|
||||
Content-Type: text/plain
|
||||
|
||||
%s
|
||||
---123--
|
||||
""".replace('\n', '\r\n') % content
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
self.assertEqual(gen_result(data, environ),
|
||||
{'upload': content.encode('latin1')})
|
||||
check('x' * (maxline - 1))
|
||||
check('x' * (maxline - 1) + '\r')
|
||||
check('x' * (maxline - 1) + '\r' + 'y' * (maxline - 1))
|
||||
|
||||
def test_fieldstorage_multipart_w3c(self):
|
||||
# Test basic FieldStorage multipart parsing (W3C sample)
|
||||
env = {
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
|
||||
'CONTENT_LENGTH': str(len(POSTDATA_W3))}
|
||||
fp = BytesIO(POSTDATA_W3.encode('latin-1'))
|
||||
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
|
||||
self.assertEqual(len(fs.list), 2)
|
||||
self.assertEqual(fs.list[0].name, 'submit-name')
|
||||
self.assertEqual(fs.list[0].value, 'Larry')
|
||||
self.assertEqual(fs.list[1].name, 'files')
|
||||
files = fs.list[1].value
|
||||
self.assertEqual(len(files), 2)
|
||||
expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
|
||||
{'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
|
||||
for x in range(len(files)):
|
||||
for k, exp in expect[x].items():
|
||||
got = getattr(files[x], k)
|
||||
self.assertEqual(got, exp)
|
||||
|
||||
def test_fieldstorage_part_content_length(self):
|
||||
BOUNDARY = "JfISa01"
|
||||
POSTDATA = """--JfISa01
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
Content-Length: 5
|
||||
|
||||
Larry
|
||||
--JfISa01"""
|
||||
env = {
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
|
||||
'CONTENT_LENGTH': str(len(POSTDATA))}
|
||||
fp = BytesIO(POSTDATA.encode('latin-1'))
|
||||
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
|
||||
self.assertEqual(len(fs.list), 1)
|
||||
self.assertEqual(fs.list[0].name, 'submit-name')
|
||||
self.assertEqual(fs.list[0].value, 'Larry')
|
||||
|
||||
def test_field_storage_multipart_no_content_length(self):
|
||||
fp = BytesIO(b"""--MyBoundary
|
||||
Content-Disposition: form-data; name="my-arg"; filename="foo"
|
||||
|
||||
Test
|
||||
|
||||
--MyBoundary--
|
||||
""")
|
||||
env = {
|
||||
"REQUEST_METHOD": "POST",
|
||||
"CONTENT_TYPE": "multipart/form-data; boundary=MyBoundary",
|
||||
"wsgi.input": fp,
|
||||
}
|
||||
fields = cgi.FieldStorage(fp, environ=env)
|
||||
|
||||
self.assertEqual(len(fields["my-arg"].file.read()), 5)
|
||||
|
||||
def test_fieldstorage_as_context_manager(self):
|
||||
fp = BytesIO(b'x' * 10)
|
||||
env = {'REQUEST_METHOD': 'PUT'}
|
||||
with cgi.FieldStorage(fp=fp, environ=env) as fs:
|
||||
content = fs.file.read()
|
||||
self.assertFalse(fs.file.closed)
|
||||
self.assertTrue(fs.file.closed)
|
||||
self.assertEqual(content, 'x' * 10)
|
||||
with self.assertRaisesRegex(ValueError, 'I/O operation on closed file'):
|
||||
fs.file.read()
|
||||
|
||||
_qs_result = {
|
||||
'key1': 'value1',
|
||||
'key2': ['value2x', 'value2y'],
|
||||
'key3': 'value3',
|
||||
'key4': 'value4'
|
||||
}
|
||||
def testQSAndUrlEncode(self):
|
||||
data = "key2=value2x&key3=value3&key4=value4"
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
|
||||
'QUERY_STRING': 'key1=value1&key2=value2y',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
v = gen_result(data, environ)
|
||||
self.assertEqual(self._qs_result, v)
|
||||
|
||||
def test_max_num_fields(self):
|
||||
# For application/x-www-form-urlencoded
|
||||
data = '&'.join(['a=a']*11)
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
cgi.FieldStorage(
|
||||
fp=BytesIO(data.encode()),
|
||||
environ=environ,
|
||||
max_num_fields=10,
|
||||
)
|
||||
|
||||
# For multipart/form-data
|
||||
data = """---123
|
||||
Content-Disposition: form-data; name="a"
|
||||
|
||||
3
|
||||
---123
|
||||
Content-Type: application/x-www-form-urlencoded
|
||||
|
||||
a=4
|
||||
---123
|
||||
Content-Type: application/x-www-form-urlencoded
|
||||
|
||||
a=5
|
||||
---123--
|
||||
"""
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
|
||||
'QUERY_STRING': 'a=1&a=2',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
|
||||
# 2 GET entities
|
||||
# 1 top level POST entities
|
||||
# 1 entity within the second POST entity
|
||||
# 1 entity within the third POST entity
|
||||
with self.assertRaises(ValueError):
|
||||
cgi.FieldStorage(
|
||||
fp=BytesIO(data.encode()),
|
||||
environ=environ,
|
||||
max_num_fields=4,
|
||||
)
|
||||
cgi.FieldStorage(
|
||||
fp=BytesIO(data.encode()),
|
||||
environ=environ,
|
||||
max_num_fields=5,
|
||||
)
|
||||
|
||||
def testQSAndFormData(self):
|
||||
data = """---123
|
||||
Content-Disposition: form-data; name="key2"
|
||||
|
||||
value2y
|
||||
---123
|
||||
Content-Disposition: form-data; name="key3"
|
||||
|
||||
value3
|
||||
---123
|
||||
Content-Disposition: form-data; name="key4"
|
||||
|
||||
value4
|
||||
---123--
|
||||
"""
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
|
||||
'QUERY_STRING': 'key1=value1&key2=value2x',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
v = gen_result(data, environ)
|
||||
self.assertEqual(self._qs_result, v)
|
||||
|
||||
def testQSAndFormDataFile(self):
|
||||
data = """---123
|
||||
Content-Disposition: form-data; name="key2"
|
||||
|
||||
value2y
|
||||
---123
|
||||
Content-Disposition: form-data; name="key3"
|
||||
|
||||
value3
|
||||
---123
|
||||
Content-Disposition: form-data; name="key4"
|
||||
|
||||
value4
|
||||
---123
|
||||
Content-Disposition: form-data; name="upload"; filename="fake.txt"
|
||||
Content-Type: text/plain
|
||||
|
||||
this is the content of the fake file
|
||||
|
||||
---123--
|
||||
"""
|
||||
environ = {
|
||||
'CONTENT_LENGTH': str(len(data)),
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
|
||||
'QUERY_STRING': 'key1=value1&key2=value2x',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
}
|
||||
result = self._qs_result.copy()
|
||||
result.update({
|
||||
'upload': b'this is the content of the fake file\n'
|
||||
})
|
||||
v = gen_result(data, environ)
|
||||
self.assertEqual(result, v)
|
||||
|
||||
def test_parse_header(self):
|
||||
self.assertEqual(
|
||||
cgi.parse_header("text/plain"),
|
||||
("text/plain", {}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header("text/vnd.just.made.this.up ; "),
|
||||
("text/vnd.just.made.this.up", {}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header("text/plain;charset=us-ascii"),
|
||||
("text/plain", {"charset": "us-ascii"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('text/plain ; charset="us-ascii"'),
|
||||
("text/plain", {"charset": "us-ascii"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('text/plain ; charset="us-ascii"; another=opt'),
|
||||
("text/plain", {"charset": "us-ascii", "another": "opt"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('attachment; filename="silly.txt"'),
|
||||
("attachment", {"filename": "silly.txt"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('attachment; filename="strange;name"'),
|
||||
("attachment", {"filename": "strange;name"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('attachment; filename="strange;name";size=123;'),
|
||||
("attachment", {"filename": "strange;name", "size": "123"}))
|
||||
self.assertEqual(
|
||||
cgi.parse_header('form-data; name="files"; filename="fo\\"o;bar"'),
|
||||
("form-data", {"name": "files", "filename": 'fo"o;bar'}))
|
||||
|
||||
def test_all(self):
|
||||
not_exported = {
|
||||
"logfile", "logfp", "initlog", "dolog", "nolog", "closelog", "log",
|
||||
"maxlen", "valid_boundary"}
|
||||
support.check__all__(self, cgi, not_exported=not_exported)
|
||||
|
||||
|
||||
BOUNDARY = "---------------------------721837373350705526688164684"
|
||||
|
||||
POSTDATA = """-----------------------------721837373350705526688164684
|
||||
Content-Disposition: form-data; name="id"
|
||||
|
||||
1234
|
||||
-----------------------------721837373350705526688164684
|
||||
Content-Disposition: form-data; name="title"
|
||||
|
||||
|
||||
-----------------------------721837373350705526688164684
|
||||
Content-Disposition: form-data; name="file"; filename="test.txt"
|
||||
Content-Type: text/plain
|
||||
|
||||
Testing 123.
|
||||
|
||||
-----------------------------721837373350705526688164684
|
||||
Content-Disposition: form-data; name="submit"
|
||||
|
||||
Add\x20
|
||||
-----------------------------721837373350705526688164684--
|
||||
"""
|
||||
|
||||
POSTDATA_NON_ASCII = """-----------------------------721837373350705526688164684
|
||||
Content-Disposition: form-data; name="id"
|
||||
|
||||
\xe7\xf1\x80
|
||||
-----------------------------721837373350705526688164684
|
||||
"""
|
||||
|
||||
# http://www.w3.org/TR/html401/interact/forms.html#h-17.13.4
|
||||
BOUNDARY_W3 = "AaB03x"
|
||||
POSTDATA_W3 = """--AaB03x
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
|
||||
Larry
|
||||
--AaB03x
|
||||
Content-Disposition: form-data; name="files"
|
||||
Content-Type: multipart/mixed; boundary=BbC04y
|
||||
|
||||
--BbC04y
|
||||
Content-Disposition: file; filename="file1.txt"
|
||||
Content-Type: text/plain
|
||||
|
||||
... contents of file1.txt ...
|
||||
--BbC04y
|
||||
Content-Disposition: file; filename="file2.gif"
|
||||
Content-Type: image/gif
|
||||
Content-Transfer-Encoding: binary
|
||||
|
||||
...contents of file2.gif...
|
||||
--BbC04y--
|
||||
--AaB03x--
|
||||
"""
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
1
Lib/test/test_httpservers.py
vendored
1
Lib/test/test_httpservers.py
vendored
@@ -778,6 +778,7 @@ class CGIHTTPServerTestCase(BaseTestCase):
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON; works only on windows")
|
||||
@unittest.expectedFailure
|
||||
def test_post(self):
|
||||
params = urllib.parse.urlencode(
|
||||
{'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
|
||||
|
||||
258
Lib/test/test_uu.py
vendored
258
Lib/test/test_uu.py
vendored
@@ -1,258 +0,0 @@
|
||||
"""
|
||||
Tests for uu module.
|
||||
Nick Mathewson
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from test.support import os_helper
|
||||
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import uu
|
||||
import io
|
||||
|
||||
plaintext = b"The symbols on top of your keyboard are !@#$%^&*()_+|~\n"
|
||||
|
||||
encodedtext = b"""\
|
||||
M5&AE('-Y;6)O;',@;VX@=&]P(&]F('EO=7(@:V5Y8F]A<F0@87)E("% (R0E
|
||||
*7B8J*"E?*WQ^"@ """
|
||||
|
||||
# Stolen from io.py
|
||||
class FakeIO(io.TextIOWrapper):
|
||||
"""Text I/O implementation using an in-memory buffer.
|
||||
|
||||
Can be a used as a drop-in replacement for sys.stdin and sys.stdout.
|
||||
"""
|
||||
|
||||
# XXX This is really slow, but fully functional
|
||||
|
||||
def __init__(self, initial_value="", encoding="utf-8",
|
||||
errors="strict", newline="\n"):
|
||||
super(FakeIO, self).__init__(io.BytesIO(),
|
||||
encoding=encoding,
|
||||
errors=errors,
|
||||
newline=newline)
|
||||
self._encoding = encoding
|
||||
self._errors = errors
|
||||
if initial_value:
|
||||
if not isinstance(initial_value, str):
|
||||
initial_value = str(initial_value)
|
||||
self.write(initial_value)
|
||||
self.seek(0)
|
||||
|
||||
def getvalue(self):
|
||||
self.flush()
|
||||
return self.buffer.getvalue().decode(self._encoding, self._errors)
|
||||
|
||||
|
||||
def encodedtextwrapped(mode, filename, backtick=False):
|
||||
if backtick:
|
||||
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
|
||||
encodedtext.replace(b' ', b'`') + b"\n`\nend\n")
|
||||
else:
|
||||
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
|
||||
encodedtext + b"\n \nend\n")
|
||||
return res
|
||||
|
||||
class UUTest(unittest.TestCase):
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_encode(self):
|
||||
inp = io.BytesIO(plaintext)
|
||||
out = io.BytesIO()
|
||||
uu.encode(inp, out, "t1")
|
||||
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1"))
|
||||
inp = io.BytesIO(plaintext)
|
||||
out = io.BytesIO()
|
||||
uu.encode(inp, out, "t1", 0o644)
|
||||
self.assertEqual(out.getvalue(), encodedtextwrapped(0o644, "t1"))
|
||||
inp = io.BytesIO(plaintext)
|
||||
out = io.BytesIO()
|
||||
uu.encode(inp, out, "t1", backtick=True)
|
||||
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1", True))
|
||||
with self.assertRaises(TypeError):
|
||||
uu.encode(inp, out, "t1", 0o644, True)
|
||||
|
||||
def test_decode(self):
|
||||
for backtick in True, False:
|
||||
inp = io.BytesIO(encodedtextwrapped(0o666, "t1", backtick=backtick))
|
||||
out = io.BytesIO()
|
||||
uu.decode(inp, out)
|
||||
self.assertEqual(out.getvalue(), plaintext)
|
||||
inp = io.BytesIO(
|
||||
b"UUencoded files may contain many lines,\n" +
|
||||
b"even some that have 'begin' in them.\n" +
|
||||
encodedtextwrapped(0o666, "t1", backtick=backtick)
|
||||
)
|
||||
out = io.BytesIO()
|
||||
uu.decode(inp, out)
|
||||
self.assertEqual(out.getvalue(), plaintext)
|
||||
|
||||
def test_truncatedinput(self):
|
||||
inp = io.BytesIO(b"begin 644 t1\n" + encodedtext)
|
||||
out = io.BytesIO()
|
||||
try:
|
||||
uu.decode(inp, out)
|
||||
self.fail("No exception raised")
|
||||
except uu.Error as e:
|
||||
self.assertEqual(str(e), "Truncated input file")
|
||||
|
||||
def test_missingbegin(self):
|
||||
inp = io.BytesIO(b"")
|
||||
out = io.BytesIO()
|
||||
try:
|
||||
uu.decode(inp, out)
|
||||
self.fail("No exception raised")
|
||||
except uu.Error as e:
|
||||
self.assertEqual(str(e), "No valid begin line found in input file")
|
||||
|
||||
def test_garbage_padding(self):
|
||||
# Issue #22406
|
||||
encodedtext1 = (
|
||||
b"begin 644 file\n"
|
||||
# length 1; bits 001100 111111 111111 111111
|
||||
b"\x21\x2C\x5F\x5F\x5F\n"
|
||||
b"\x20\n"
|
||||
b"end\n"
|
||||
)
|
||||
encodedtext2 = (
|
||||
b"begin 644 file\n"
|
||||
# length 1; bits 001100 111111 111111 111111
|
||||
b"\x21\x2C\x5F\x5F\x5F\n"
|
||||
b"\x60\n"
|
||||
b"end\n"
|
||||
)
|
||||
plaintext = b"\x33" # 00110011
|
||||
|
||||
for encodedtext in encodedtext1, encodedtext2:
|
||||
with self.subTest("uu.decode()"):
|
||||
inp = io.BytesIO(encodedtext)
|
||||
out = io.BytesIO()
|
||||
uu.decode(inp, out, quiet=True)
|
||||
self.assertEqual(out.getvalue(), plaintext)
|
||||
|
||||
with self.subTest("uu_codec"):
|
||||
import codecs
|
||||
decoded = codecs.decode(encodedtext, "uu_codec")
|
||||
self.assertEqual(decoded, plaintext)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_newlines_escaped(self):
|
||||
# Test newlines are escaped with uu.encode
|
||||
inp = io.BytesIO(plaintext)
|
||||
out = io.BytesIO()
|
||||
filename = "test.txt\n\roverflow.txt"
|
||||
safefilename = b"test.txt\\n\\roverflow.txt"
|
||||
uu.encode(inp, out, filename)
|
||||
self.assertIn(safefilename, out.getvalue())
|
||||
|
||||
class UUStdIOTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.stdin = sys.stdin
|
||||
self.stdout = sys.stdout
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdin = self.stdin
|
||||
sys.stdout = self.stdout
|
||||
|
||||
def test_encode(self):
|
||||
sys.stdin = FakeIO(plaintext.decode("ascii"))
|
||||
sys.stdout = FakeIO()
|
||||
uu.encode("-", "-", "t1", 0o666)
|
||||
self.assertEqual(sys.stdout.getvalue(),
|
||||
encodedtextwrapped(0o666, "t1").decode("ascii"))
|
||||
|
||||
def test_decode(self):
|
||||
sys.stdin = FakeIO(encodedtextwrapped(0o666, "t1").decode("ascii"))
|
||||
sys.stdout = FakeIO()
|
||||
uu.decode("-", "-")
|
||||
stdout = sys.stdout
|
||||
sys.stdout = self.stdout
|
||||
sys.stdin = self.stdin
|
||||
self.assertEqual(stdout.getvalue(), plaintext.decode("ascii"))
|
||||
|
||||
class UUFileTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# uu.encode() supports only ASCII file names
|
||||
self.tmpin = os_helper.TESTFN_ASCII + "i"
|
||||
self.tmpout = os_helper.TESTFN_ASCII + "o"
|
||||
self.addCleanup(os_helper.unlink, self.tmpin)
|
||||
self.addCleanup(os_helper.unlink, self.tmpout)
|
||||
|
||||
def test_encode(self):
|
||||
with open(self.tmpin, 'wb') as fin:
|
||||
fin.write(plaintext)
|
||||
|
||||
with open(self.tmpin, 'rb') as fin:
|
||||
with open(self.tmpout, 'wb') as fout:
|
||||
uu.encode(fin, fout, self.tmpin, mode=0o644)
|
||||
|
||||
with open(self.tmpout, 'rb') as fout:
|
||||
s = fout.read()
|
||||
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
|
||||
|
||||
# in_file and out_file as filenames
|
||||
uu.encode(self.tmpin, self.tmpout, self.tmpin, mode=0o644)
|
||||
with open(self.tmpout, 'rb') as fout:
|
||||
s = fout.read()
|
||||
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
|
||||
|
||||
def test_decode(self):
|
||||
with open(self.tmpin, 'wb') as f:
|
||||
f.write(encodedtextwrapped(0o644, self.tmpout))
|
||||
|
||||
with open(self.tmpin, 'rb') as f:
|
||||
uu.decode(f)
|
||||
|
||||
with open(self.tmpout, 'rb') as f:
|
||||
s = f.read()
|
||||
self.assertEqual(s, plaintext)
|
||||
# XXX is there an xp way to verify the mode?
|
||||
|
||||
def test_decode_filename(self):
|
||||
with open(self.tmpin, 'wb') as f:
|
||||
f.write(encodedtextwrapped(0o644, self.tmpout))
|
||||
|
||||
uu.decode(self.tmpin)
|
||||
|
||||
with open(self.tmpout, 'rb') as f:
|
||||
s = f.read()
|
||||
self.assertEqual(s, plaintext)
|
||||
|
||||
def test_decodetwice(self):
|
||||
# Verify that decode() will refuse to overwrite an existing file
|
||||
with open(self.tmpin, 'wb') as f:
|
||||
f.write(encodedtextwrapped(0o644, self.tmpout))
|
||||
with open(self.tmpin, 'rb') as f:
|
||||
uu.decode(f)
|
||||
|
||||
with open(self.tmpin, 'rb') as f:
|
||||
self.assertRaises(uu.Error, uu.decode, f)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_decode_mode(self):
|
||||
# Verify that decode() will set the given mode for the out_file
|
||||
expected_mode = 0o444
|
||||
with open(self.tmpin, 'wb') as f:
|
||||
f.write(encodedtextwrapped(expected_mode, self.tmpout))
|
||||
|
||||
# make file writable again, so it can be removed (Windows only)
|
||||
self.addCleanup(os.chmod, self.tmpout, expected_mode | stat.S_IWRITE)
|
||||
|
||||
with open(self.tmpin, 'rb') as f:
|
||||
uu.decode(f)
|
||||
|
||||
self.assertEqual(
|
||||
stat.S_IMODE(os.stat(self.tmpout).st_mode),
|
||||
expected_mode
|
||||
)
|
||||
|
||||
|
||||
if __name__=="__main__":
|
||||
unittest.main()
|
||||
199
Lib/uu.py
vendored
199
Lib/uu.py
vendored
@@ -1,199 +0,0 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
# Copyright 1994 by Lance Ellinghouse
|
||||
# Cathedral City, California Republic, United States of America.
|
||||
# All Rights Reserved
|
||||
# Permission to use, copy, modify, and distribute this software and its
|
||||
# documentation for any purpose and without fee is hereby granted,
|
||||
# provided that the above copyright notice appear in all copies and that
|
||||
# both that copyright notice and this permission notice appear in
|
||||
# supporting documentation, and that the name of Lance Ellinghouse
|
||||
# not be used in advertising or publicity pertaining to distribution
|
||||
# of the software without specific, written prior permission.
|
||||
# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO
|
||||
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
||||
# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE
|
||||
# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
|
||||
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
#
|
||||
# Modified by Jack Jansen, CWI, July 1995:
|
||||
# - Use binascii module to do the actual line-by-line conversion
|
||||
# between ascii and binary. This results in a 1000-fold speedup. The C
|
||||
# version is still 5 times faster, though.
|
||||
# - Arguments more compliant with python standard
|
||||
|
||||
"""Implementation of the UUencode and UUdecode functions.
|
||||
|
||||
encode(in_file, out_file [,name, mode])
|
||||
decode(in_file [, out_file, mode])
|
||||
"""
|
||||
|
||||
import binascii
|
||||
import os
|
||||
import sys
|
||||
|
||||
__all__ = ["Error", "encode", "decode"]
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
def encode(in_file, out_file, name=None, mode=None):
|
||||
"""Uuencode file"""
|
||||
#
|
||||
# If in_file is a pathname open it and change defaults
|
||||
#
|
||||
opened_files = []
|
||||
try:
|
||||
if in_file == '-':
|
||||
in_file = sys.stdin.buffer
|
||||
elif isinstance(in_file, str):
|
||||
if name is None:
|
||||
name = os.path.basename(in_file)
|
||||
if mode is None:
|
||||
try:
|
||||
mode = os.stat(in_file).st_mode
|
||||
except AttributeError:
|
||||
pass
|
||||
in_file = open(in_file, 'rb')
|
||||
opened_files.append(in_file)
|
||||
#
|
||||
# Open out_file if it is a pathname
|
||||
#
|
||||
if out_file == '-':
|
||||
out_file = sys.stdout.buffer
|
||||
elif isinstance(out_file, str):
|
||||
out_file = open(out_file, 'wb')
|
||||
opened_files.append(out_file)
|
||||
#
|
||||
# Set defaults for name and mode
|
||||
#
|
||||
if name is None:
|
||||
name = '-'
|
||||
if mode is None:
|
||||
mode = 0o666
|
||||
#
|
||||
# Write the data
|
||||
#
|
||||
out_file.write(('begin %o %s\n' % ((mode & 0o777), name)).encode("ascii"))
|
||||
data = in_file.read(45)
|
||||
while len(data) > 0:
|
||||
out_file.write(binascii.b2a_uu(data))
|
||||
data = in_file.read(45)
|
||||
out_file.write(b' \nend\n')
|
||||
finally:
|
||||
for f in opened_files:
|
||||
f.close()
|
||||
|
||||
|
||||
def decode(in_file, out_file=None, mode=None, quiet=False):
|
||||
"""Decode uuencoded file"""
|
||||
#
|
||||
# Open the input file, if needed.
|
||||
#
|
||||
opened_files = []
|
||||
if in_file == '-':
|
||||
in_file = sys.stdin.buffer
|
||||
elif isinstance(in_file, str):
|
||||
in_file = open(in_file, 'rb')
|
||||
opened_files.append(in_file)
|
||||
|
||||
try:
|
||||
#
|
||||
# Read until a begin is encountered or we've exhausted the file
|
||||
#
|
||||
while True:
|
||||
hdr = in_file.readline()
|
||||
if not hdr:
|
||||
raise Error('No valid begin line found in input file')
|
||||
if not hdr.startswith(b'begin'):
|
||||
continue
|
||||
hdrfields = hdr.split(b' ', 2)
|
||||
if len(hdrfields) == 3 and hdrfields[0] == b'begin':
|
||||
try:
|
||||
int(hdrfields[1], 8)
|
||||
break
|
||||
except ValueError:
|
||||
pass
|
||||
if out_file is None:
|
||||
# If the filename isn't ASCII, what's up with that?!?
|
||||
out_file = hdrfields[2].rstrip(b' \t\r\n\f').decode("ascii")
|
||||
if os.path.exists(out_file):
|
||||
raise Error('Cannot overwrite existing file: %s' % out_file)
|
||||
if mode is None:
|
||||
mode = int(hdrfields[1], 8)
|
||||
#
|
||||
# Open the output file
|
||||
#
|
||||
if out_file == '-':
|
||||
out_file = sys.stdout.buffer
|
||||
elif isinstance(out_file, str):
|
||||
fp = open(out_file, 'wb')
|
||||
try:
|
||||
os.path.chmod(out_file, mode)
|
||||
except AttributeError:
|
||||
pass
|
||||
out_file = fp
|
||||
opened_files.append(out_file)
|
||||
#
|
||||
# Main decoding loop
|
||||
#
|
||||
s = in_file.readline()
|
||||
while s and s.strip(b' \t\r\n\f') != b'end':
|
||||
try:
|
||||
data = binascii.a2b_uu(s)
|
||||
except binascii.Error as v:
|
||||
# Workaround for broken uuencoders by /Fredrik Lundh
|
||||
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
|
||||
data = binascii.a2b_uu(s[:nbytes])
|
||||
if not quiet:
|
||||
sys.stderr.write("Warning: %s\n" % v)
|
||||
out_file.write(data)
|
||||
s = in_file.readline()
|
||||
if not s:
|
||||
raise Error('Truncated input file')
|
||||
finally:
|
||||
for f in opened_files:
|
||||
f.close()
|
||||
|
||||
def test():
|
||||
"""uuencode/uudecode main program"""
|
||||
|
||||
import optparse
|
||||
parser = optparse.OptionParser(usage='usage: %prog [-d] [-t] [input [output]]')
|
||||
parser.add_option('-d', '--decode', dest='decode', help='Decode (instead of encode)?', default=False, action='store_true')
|
||||
parser.add_option('-t', '--text', dest='text', help='data is text, encoded format unix-compatible text?', default=False, action='store_true')
|
||||
|
||||
(options, args) = parser.parse_args()
|
||||
if len(args) > 2:
|
||||
parser.error('incorrect number of arguments')
|
||||
sys.exit(1)
|
||||
|
||||
# Use the binary streams underlying stdin/stdout
|
||||
input = sys.stdin.buffer
|
||||
output = sys.stdout.buffer
|
||||
if len(args) > 0:
|
||||
input = args[0]
|
||||
if len(args) > 1:
|
||||
output = args[1]
|
||||
|
||||
if options.decode:
|
||||
if options.text:
|
||||
if isinstance(output, str):
|
||||
output = open(output, 'wb')
|
||||
else:
|
||||
print(sys.argv[0], ': cannot do -t to stdout')
|
||||
sys.exit(1)
|
||||
decode(input, output)
|
||||
else:
|
||||
if options.text:
|
||||
if isinstance(input, str):
|
||||
input = open(input, 'rb')
|
||||
else:
|
||||
print(sys.argv[0], ': cannot do -t from stdin')
|
||||
sys.exit(1)
|
||||
encode(input, output)
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
||||
@@ -81,7 +81,7 @@ if sys.platform.startswith("win"):
|
||||
0x00000100 | 0x00000001 | 0x00000020 | 0x00002000 | 0x00000010 | 0x00008000 | 0x00020000
|
||||
|
||||
# We really can't test if the results are correct, so it just checks for meaningful value
|
||||
assert winver.major > 0
|
||||
assert winver.major > 6
|
||||
assert winver.minor >= 0
|
||||
assert winver.build > 0
|
||||
assert winver.platform == 2
|
||||
@@ -91,8 +91,8 @@ if sys.platform.startswith("win"):
|
||||
|
||||
# XXX if platform_version is implemented correctly, this'll break on compatiblity mode or a build without manifest
|
||||
# these fields can mismatch in CPython
|
||||
# assert winver.major == winver.platform_version[0]
|
||||
# assert winver.minor == winver.platform_version[1]
|
||||
assert winver.major == winver.platform_version[0]
|
||||
assert winver.minor == winver.platform_version[1]
|
||||
# assert winver.build == winver.platform_version[2]
|
||||
|
||||
# test int_max_str_digits getter and setter
|
||||
|
||||
@@ -31,10 +31,11 @@ mod _overlapped {
|
||||
},
|
||||
System::Threading::INFINITE,
|
||||
};
|
||||
#[pyattr(once)]
|
||||
fn INVALID_HANDLE_VALUE(_vm: &VirtualMachine) -> isize {
|
||||
windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE as isize
|
||||
}
|
||||
|
||||
#[pyattr]
|
||||
const INVALID_HANDLE_VALUE: isize =
|
||||
unsafe { std::mem::transmute(windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE) };
|
||||
|
||||
#[pyattr]
|
||||
const NULL: isize = 0;
|
||||
|
||||
@@ -263,7 +264,7 @@ mod _overlapped {
|
||||
type Args = (isize,);
|
||||
|
||||
fn py_new(cls: PyTypeRef, (mut event,): Self::Args, vm: &VirtualMachine) -> PyResult {
|
||||
if event == INVALID_HANDLE_VALUE(vm) {
|
||||
if event == INVALID_HANDLE_VALUE {
|
||||
event = unsafe {
|
||||
windows_sys::Win32::System::Threading::CreateEventA(
|
||||
std::ptr::null(),
|
||||
|
||||
@@ -22,12 +22,23 @@ mod sys {
|
||||
vm::{Settings, VirtualMachine},
|
||||
};
|
||||
use num_traits::ToPrimitive;
|
||||
#[cfg(windows)]
|
||||
use std::os::windows::ffi::OsStrExt;
|
||||
use std::{
|
||||
env::{self, VarError},
|
||||
path,
|
||||
sync::atomic::Ordering,
|
||||
};
|
||||
|
||||
#[cfg(windows)]
|
||||
use windows_sys::Win32::{
|
||||
Foundation::MAX_PATH,
|
||||
Storage::FileSystem::{
|
||||
GetFileVersionInfoSizeW, GetFileVersionInfoW, VS_FIXEDFILEINFO, VerQueryValueW,
|
||||
},
|
||||
System::LibraryLoader::{GetModuleFileNameW, GetModuleHandleW},
|
||||
};
|
||||
|
||||
// not the same as CPython (e.g. rust's x86_x64-unknown-linux-gnu is just x86_64-linux-gnu)
|
||||
// but hopefully that's just an implementation detail? TODO: copy CPython's multiarch exactly,
|
||||
// https://github.com/python/cpython/blob/3.8/configure.ac#L725
|
||||
@@ -485,6 +496,78 @@ mod sys {
|
||||
vm.trace_func.borrow().clone()
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn get_kernel32_version() -> std::io::Result<(u32, u32, u32)> {
|
||||
unsafe {
|
||||
// Create a wide string for "kernel32.dll"
|
||||
let module_name: Vec<u16> = std::ffi::OsStr::new("kernel32.dll")
|
||||
.encode_wide()
|
||||
.chain(Some(0))
|
||||
.collect();
|
||||
let h_kernel32 = GetModuleHandleW(module_name.as_ptr());
|
||||
if h_kernel32.is_null() {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Prepare a buffer for the module file path
|
||||
let mut kernel32_path = [0u16; MAX_PATH as usize];
|
||||
let len = GetModuleFileNameW(
|
||||
h_kernel32,
|
||||
kernel32_path.as_mut_ptr(),
|
||||
kernel32_path.len() as u32,
|
||||
);
|
||||
if len == 0 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Get the size of the version information block
|
||||
let verblock_size =
|
||||
GetFileVersionInfoSizeW(kernel32_path.as_ptr(), std::ptr::null_mut());
|
||||
if verblock_size == 0 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Allocate a buffer to hold the version information
|
||||
let mut verblock = vec![0u8; verblock_size as usize];
|
||||
if GetFileVersionInfoW(
|
||||
kernel32_path.as_ptr(),
|
||||
0,
|
||||
verblock_size,
|
||||
verblock.as_mut_ptr() as *mut _,
|
||||
) == 0
|
||||
{
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Prepare an empty sub-block string (L"") as required by VerQueryValueW
|
||||
let sub_block: Vec<u16> = std::ffi::OsStr::new("")
|
||||
.encode_wide()
|
||||
.chain(Some(0))
|
||||
.collect();
|
||||
|
||||
let mut ffi_ptr: *mut VS_FIXEDFILEINFO = std::ptr::null_mut();
|
||||
let mut ffi_len: u32 = 0;
|
||||
if VerQueryValueW(
|
||||
verblock.as_ptr() as *const _,
|
||||
sub_block.as_ptr(),
|
||||
&mut ffi_ptr as *mut *mut VS_FIXEDFILEINFO as *mut *mut _,
|
||||
&mut ffi_len as *mut u32,
|
||||
) == 0
|
||||
|| ffi_ptr.is_null()
|
||||
{
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Extract the version numbers from the VS_FIXEDFILEINFO structure.
|
||||
let ffi = *ffi_ptr;
|
||||
let real_major = (ffi.dwProductVersionMS >> 16) & 0xFFFF;
|
||||
let real_minor = ffi.dwProductVersionMS & 0xFFFF;
|
||||
let real_build = (ffi.dwProductVersionLS >> 16) & 0xFFFF;
|
||||
|
||||
Ok((real_major, real_minor, real_build))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
#[pyfunction]
|
||||
fn getwindowsversion(vm: &VirtualMachine) -> PyResult<crate::builtins::tuple::PyTupleRef> {
|
||||
@@ -519,21 +602,18 @@ mod sys {
|
||||
sp.into_string()
|
||||
.map_err(|_| vm.new_os_error("service pack is not ASCII".to_owned()))?
|
||||
};
|
||||
let real_version = get_kernel32_version().map_err(|e| vm.new_os_error(e.to_string()))?;
|
||||
Ok(WindowsVersion {
|
||||
major: version.dwMajorVersion,
|
||||
minor: version.dwMinorVersion,
|
||||
build: version.dwBuildNumber,
|
||||
major: real_version.0,
|
||||
minor: real_version.1,
|
||||
build: real_version.2,
|
||||
platform: version.dwPlatformId,
|
||||
service_pack,
|
||||
service_pack_major: version.wServicePackMajor,
|
||||
service_pack_minor: version.wServicePackMinor,
|
||||
suite_mask: version.wSuiteMask,
|
||||
product_type: version.wProductType,
|
||||
platform_version: (
|
||||
version.dwMajorVersion,
|
||||
version.dwMinorVersion,
|
||||
version.dwBuildNumber,
|
||||
), // TODO Provide accurate version, like CPython impl
|
||||
platform_version: (real_version.0, real_version.1, real_version.2), // TODO Provide accurate version, like CPython impl
|
||||
}
|
||||
.into_struct_sequence(vm))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user