mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
PySSLCertificate (#6219)
This commit is contained in:
4
Lib/test/test_urllib2_localnet.py
vendored
4
Lib/test/test_urllib2_localnet.py
vendored
@@ -568,8 +568,6 @@ class TestUrlopen(unittest.TestCase):
|
||||
self.assertEqual(data, expected_response)
|
||||
self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
|
||||
def test_https(self):
|
||||
handler = self.start_https_server()
|
||||
@@ -577,8 +575,6 @@ class TestUrlopen(unittest.TestCase):
|
||||
data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
|
||||
self.assertEqual(data, b"we care a bit")
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON, ValueError: illegal environment variable name")
|
||||
def test_https_with_cafile(self):
|
||||
handler = self.start_https_server(certfile=CERT_localhost)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
// spell-checker:disable
|
||||
|
||||
mod cert;
|
||||
|
||||
use crate::vm::{PyRef, VirtualMachine, builtins::PyModule};
|
||||
use openssl_probe::ProbeResult;
|
||||
|
||||
@@ -26,15 +28,12 @@ cfg_if::cfg_if! {
|
||||
}
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[pymodule(with(ossl101, ossl111, windows))]
|
||||
#[pymodule(with(cert::ssl_cert, ossl101, ossl111, windows))]
|
||||
mod _ssl {
|
||||
use super::{bio, probe};
|
||||
use crate::{
|
||||
common::{
|
||||
ascii,
|
||||
lock::{
|
||||
PyMappedRwLockReadGuard, PyMutex, PyRwLock, PyRwLockReadGuard, PyRwLockWriteGuard,
|
||||
},
|
||||
common::lock::{
|
||||
PyMappedRwLockReadGuard, PyMutex, PyRwLock, PyRwLockReadGuard, PyRwLockWriteGuard,
|
||||
},
|
||||
socket::{self, PySocket},
|
||||
vm::{
|
||||
@@ -43,7 +42,7 @@ mod _ssl {
|
||||
PyBaseExceptionRef, PyBytesRef, PyListRef, PyOSError, PyStrRef, PyTypeRef, PyWeak,
|
||||
},
|
||||
class_or_notimplemented,
|
||||
convert::{ToPyException, ToPyObject},
|
||||
convert::ToPyException,
|
||||
exceptions,
|
||||
function::{
|
||||
ArgBytesLike, ArgCallable, ArgMemoryBuffer, ArgStrOrBytesLike, Either, FsPath,
|
||||
@@ -60,7 +59,7 @@ mod _ssl {
|
||||
error::ErrorStack,
|
||||
nid::Nid,
|
||||
ssl::{self, SslContextBuilder, SslOptions, SslVerifyMode},
|
||||
x509::{self, X509, X509Ref},
|
||||
x509::X509,
|
||||
};
|
||||
use openssl_sys as sys;
|
||||
use rustpython_vm::ospath::OsPath;
|
||||
@@ -73,6 +72,14 @@ mod _ssl {
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
// Import certificate types from parent module
|
||||
use super::cert::{self, cert_to_certificate, cert_to_py};
|
||||
|
||||
// Re-export PySSLCertificate to make it available in the _ssl module
|
||||
// It will be automatically exposed to Python via #[pyclass]
|
||||
#[allow(unused_imports)]
|
||||
use super::cert::PySSLCertificate;
|
||||
|
||||
// Constants
|
||||
#[pyattr]
|
||||
use sys::{
|
||||
@@ -178,6 +185,18 @@ mod _ssl {
|
||||
#[pyattr]
|
||||
const HAS_PSK: bool = true;
|
||||
|
||||
// Encoding constants for Certificate.public_bytes()
|
||||
#[pyattr]
|
||||
pub(crate) const ENCODING_PEM: i32 = sys::X509_FILETYPE_PEM;
|
||||
#[pyattr]
|
||||
pub(crate) const ENCODING_DER: i32 = sys::X509_FILETYPE_ASN1;
|
||||
#[pyattr]
|
||||
const ENCODING_PEM_AUX: i32 = sys::X509_FILETYPE_PEM + 0x100;
|
||||
|
||||
// OpenSSL error codes for unexpected EOF detection
|
||||
const ERR_LIB_SSL: i32 = 20;
|
||||
const SSL_R_UNEXPECTED_EOF_WHILE_READING: i32 = 294;
|
||||
|
||||
// the openssl version from the API headers
|
||||
|
||||
#[pyattr(name = "OPENSSL_VERSION")]
|
||||
@@ -349,32 +368,6 @@ mod _ssl {
|
||||
fn _nid2obj(nid: Nid) -> Option<Asn1Object> {
|
||||
unsafe { ptr2obj(sys::OBJ_nid2obj(nid.as_raw())) }
|
||||
}
|
||||
fn obj2txt(obj: &Asn1ObjectRef, no_name: bool) -> Option<String> {
|
||||
let no_name = i32::from(no_name);
|
||||
let ptr = obj.as_ptr();
|
||||
let b = unsafe {
|
||||
let buflen = sys::OBJ_obj2txt(std::ptr::null_mut(), 0, ptr, no_name);
|
||||
assert!(buflen >= 0);
|
||||
if buflen == 0 {
|
||||
return None;
|
||||
}
|
||||
let buflen = buflen as usize;
|
||||
let mut buf = Vec::<u8>::with_capacity(buflen + 1);
|
||||
let ret = sys::OBJ_obj2txt(
|
||||
buf.as_mut_ptr() as *mut libc::c_char,
|
||||
buf.capacity() as _,
|
||||
ptr,
|
||||
no_name,
|
||||
);
|
||||
assert!(ret >= 0);
|
||||
// SAFETY: OBJ_obj2txt initialized the buffer successfully
|
||||
buf.set_len(buflen);
|
||||
buf
|
||||
};
|
||||
let s = String::from_utf8(b)
|
||||
.unwrap_or_else(|e| String::from_utf8_lossy(e.as_bytes()).into_owned());
|
||||
Some(s)
|
||||
}
|
||||
|
||||
type PyNid = (libc::c_int, String, String, Option<String>);
|
||||
fn obj2py(obj: &Asn1ObjectRef, vm: &VirtualMachine) -> PyResult<PyNid> {
|
||||
@@ -387,7 +380,12 @@ mod _ssl {
|
||||
.long_name()
|
||||
.map_err(|_| vm.new_value_error("NID has no long name".to_owned()))?
|
||||
.to_owned();
|
||||
Ok((nid.as_raw(), short_name, long_name, obj2txt(obj, true)))
|
||||
Ok((
|
||||
nid.as_raw(),
|
||||
short_name,
|
||||
long_name,
|
||||
cert::obj2txt(obj, true),
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(FromArgs)]
|
||||
@@ -1219,46 +1217,54 @@ mod _ssl {
|
||||
}
|
||||
|
||||
#[pymethod]
|
||||
fn get_unverified_chain(&self, vm: &VirtualMachine) -> Option<PyObjectRef> {
|
||||
fn get_unverified_chain(&self, vm: &VirtualMachine) -> PyResult<Option<PyListRef>> {
|
||||
let stream = self.stream.read();
|
||||
let chain = stream.ssl().peer_cert_chain()?;
|
||||
let Some(chain) = stream.ssl().peer_cert_chain() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Return Certificate objects
|
||||
let certs: Vec<PyObjectRef> = chain
|
||||
.iter()
|
||||
.filter_map(|cert| cert.to_der().ok().map(|der| vm.ctx.new_bytes(der).into()))
|
||||
.collect();
|
||||
|
||||
Some(vm.ctx.new_list(certs).into())
|
||||
.map(|cert| unsafe {
|
||||
sys::X509_up_ref(cert.as_ptr());
|
||||
let owned = X509::from_ptr(cert.as_ptr());
|
||||
cert_to_certificate(vm, owned)
|
||||
})
|
||||
.collect::<PyResult<_>>()?;
|
||||
Ok(Some(vm.ctx.new_list(certs)))
|
||||
}
|
||||
|
||||
#[pymethod]
|
||||
fn get_verified_chain(&self, vm: &VirtualMachine) -> Option<PyListRef> {
|
||||
fn get_verified_chain(&self, vm: &VirtualMachine) -> PyResult<Option<PyListRef>> {
|
||||
let stream = self.stream.read();
|
||||
unsafe {
|
||||
let chain = sys::SSL_get0_verified_chain(stream.ssl().as_ptr());
|
||||
if chain.is_null() {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let num_certs = sys::OPENSSL_sk_num(chain as *const _);
|
||||
let mut certs = Vec::new();
|
||||
|
||||
let mut certs = Vec::with_capacity(num_certs as usize);
|
||||
// Return Certificate objects
|
||||
for i in 0..num_certs {
|
||||
let cert_ptr = sys::OPENSSL_sk_value(chain as *const _, i) as *mut sys::X509;
|
||||
if cert_ptr.is_null() {
|
||||
continue;
|
||||
}
|
||||
let cert = X509Ref::from_ptr(cert_ptr);
|
||||
if let Ok(der) = cert.to_der() {
|
||||
certs.push(vm.ctx.new_bytes(der).into());
|
||||
}
|
||||
// Clone the X509 certificate to create an owned copy
|
||||
sys::X509_up_ref(cert_ptr);
|
||||
let owned_cert = X509::from_ptr(cert_ptr);
|
||||
let cert_obj = cert_to_certificate(vm, owned_cert)?;
|
||||
certs.push(cert_obj);
|
||||
}
|
||||
|
||||
if certs.is_empty() {
|
||||
Ok(if certs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(vm.ctx.new_list(certs))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1978,7 +1984,10 @@ mod _ssl {
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn convert_openssl_error(vm: &VirtualMachine, err: ErrorStack) -> PyBaseExceptionRef {
|
||||
pub(crate) fn convert_openssl_error(
|
||||
vm: &VirtualMachine,
|
||||
err: ErrorStack,
|
||||
) -> PyBaseExceptionRef {
|
||||
let cls = PySslError::class(&vm.ctx).to_owned();
|
||||
match err.errors().last() {
|
||||
Some(e) => {
|
||||
@@ -2047,18 +2056,49 @@ mod _ssl {
|
||||
),
|
||||
ssl::ErrorCode::SYSCALL => match e.io_error() {
|
||||
Some(io_err) => return io_err.to_pyexception(vm),
|
||||
None => (
|
||||
PySslSyscallError::class(&vm.ctx).to_owned(),
|
||||
"EOF occurred in violation of protocol",
|
||||
),
|
||||
// When no I/O error and OpenSSL error queue is empty,
|
||||
// this is an EOF in violation of protocol -> SSLEOFError
|
||||
// Need to set args[0] = SSL_ERROR_EOF for suppress_ragged_eofs check
|
||||
None => {
|
||||
return vm.new_exception(
|
||||
PySslEOFError::class(&vm.ctx).to_owned(),
|
||||
vec![
|
||||
vm.ctx.new_int(SSL_ERROR_EOF).into(),
|
||||
vm.ctx
|
||||
.new_str("EOF occurred in violation of protocol")
|
||||
.into(),
|
||||
],
|
||||
);
|
||||
}
|
||||
},
|
||||
ssl::ErrorCode::SSL => match e.ssl_error() {
|
||||
Some(e) => return convert_openssl_error(vm, e.clone()),
|
||||
None => (
|
||||
ssl::ErrorCode::SSL => {
|
||||
// Check for OpenSSL 3.0 SSL_R_UNEXPECTED_EOF_WHILE_READING
|
||||
if let Some(ssl_err) = e.ssl_error() {
|
||||
// In OpenSSL 3.0+, unexpected EOF is reported as SSL_ERROR_SSL
|
||||
// with this specific reason code instead of SSL_ERROR_SYSCALL
|
||||
unsafe {
|
||||
let err_code = sys::ERR_peek_last_error();
|
||||
let reason = sys::ERR_GET_REASON(err_code);
|
||||
let lib = sys::ERR_GET_LIB(err_code);
|
||||
if lib == ERR_LIB_SSL && reason == SSL_R_UNEXPECTED_EOF_WHILE_READING {
|
||||
return vm.new_exception(
|
||||
vm.class("_ssl", "SSLEOFError"),
|
||||
vec![
|
||||
vm.ctx.new_int(SSL_ERROR_EOF).into(),
|
||||
vm.ctx
|
||||
.new_str("EOF occurred in violation of protocol")
|
||||
.into(),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
return convert_openssl_error(vm, ssl_err.clone());
|
||||
}
|
||||
(
|
||||
PySslError::class(&vm.ctx).to_owned(),
|
||||
"A failure in the SSL library occurred",
|
||||
),
|
||||
},
|
||||
)
|
||||
}
|
||||
_ => (
|
||||
PySslError::class(&vm.ctx).to_owned(),
|
||||
"A failure in the SSL library occurred",
|
||||
@@ -2106,93 +2146,6 @@ mod _ssl {
|
||||
(cipher.name(), cipher.version(), cipher.bits().secret)
|
||||
}
|
||||
|
||||
fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult {
|
||||
let r = if binary {
|
||||
let b = cert.to_der().map_err(|e| convert_openssl_error(vm, e))?;
|
||||
vm.ctx.new_bytes(b).into()
|
||||
} else {
|
||||
let dict = vm.ctx.new_dict();
|
||||
|
||||
let name_to_py = |name: &x509::X509NameRef| -> PyResult {
|
||||
let list = name
|
||||
.entries()
|
||||
.map(|entry| {
|
||||
let txt = obj2txt(entry.object(), false).to_pyobject(vm);
|
||||
let data = vm.ctx.new_str(entry.data().as_utf8()?.to_owned());
|
||||
Ok(vm.new_tuple(((txt, data),)).into())
|
||||
})
|
||||
.collect::<Result<_, _>>()
|
||||
.map_err(|e| convert_openssl_error(vm, e))?;
|
||||
Ok(vm.ctx.new_tuple(list).into())
|
||||
};
|
||||
|
||||
dict.set_item("subject", name_to_py(cert.subject_name())?, vm)?;
|
||||
dict.set_item("issuer", name_to_py(cert.issuer_name())?, vm)?;
|
||||
// X.509 version: OpenSSL uses 0-based (0=v1, 1=v2, 2=v3) but Python uses 1-based (1=v1, 2=v2, 3=v3)
|
||||
dict.set_item("version", vm.new_pyobj(cert.version() + 1), vm)?;
|
||||
|
||||
let serial_num = cert
|
||||
.serial_number()
|
||||
.to_bn()
|
||||
.and_then(|bn| bn.to_hex_str())
|
||||
.map_err(|e| convert_openssl_error(vm, e))?;
|
||||
dict.set_item(
|
||||
"serialNumber",
|
||||
vm.ctx.new_str(serial_num.to_owned()).into(),
|
||||
vm,
|
||||
)?;
|
||||
|
||||
dict.set_item(
|
||||
"notBefore",
|
||||
vm.ctx.new_str(cert.not_before().to_string()).into(),
|
||||
vm,
|
||||
)?;
|
||||
dict.set_item(
|
||||
"notAfter",
|
||||
vm.ctx.new_str(cert.not_after().to_string()).into(),
|
||||
vm,
|
||||
)?;
|
||||
|
||||
#[allow(clippy::manual_map)]
|
||||
if let Some(names) = cert.subject_alt_names() {
|
||||
let san = names
|
||||
.iter()
|
||||
.filter_map(|gen_name| {
|
||||
if let Some(email) = gen_name.email() {
|
||||
Some(vm.new_tuple((ascii!("email"), email)).into())
|
||||
} else if let Some(dnsname) = gen_name.dnsname() {
|
||||
Some(vm.new_tuple((ascii!("DNS"), dnsname)).into())
|
||||
} else if let Some(ip) = gen_name.ipaddress() {
|
||||
Some(
|
||||
vm.new_tuple((
|
||||
ascii!("IP Address"),
|
||||
String::from_utf8_lossy(ip).into_owned(),
|
||||
))
|
||||
.into(),
|
||||
)
|
||||
} else {
|
||||
// TODO: convert every type of general name:
|
||||
// https://github.com/python/cpython/blob/3.6/Modules/_ssl.c#L1092-L1231
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
dict.set_item("subjectAltName", vm.ctx.new_tuple(san).into(), vm)?;
|
||||
};
|
||||
|
||||
dict.into()
|
||||
};
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn _test_decode_cert(path: FsPath, vm: &VirtualMachine) -> PyResult {
|
||||
let path = path.to_path_buf(vm)?;
|
||||
let pem = std::fs::read(path).map_err(|e| e.to_pyexception(vm))?;
|
||||
let x509 = X509::from_pem(&pem).map_err(|e| convert_openssl_error(vm, e))?;
|
||||
cert_to_py(vm, &x509, false)
|
||||
}
|
||||
|
||||
impl Read for SocketStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let mut socket: &PySocket = &self.0;
|
||||
|
||||
234
stdlib/src/ssl/cert.rs
Normal file
234
stdlib/src/ssl/cert.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
pub(super) use ssl_cert::{PySSLCertificate, cert_to_certificate, cert_to_py, obj2txt};
|
||||
|
||||
// Certificate type for SSL module
|
||||
|
||||
#[pymodule(sub)]
|
||||
pub(crate) mod ssl_cert {
|
||||
use crate::{
|
||||
common::ascii,
|
||||
vm::{
|
||||
PyObjectRef, PyPayload, PyResult, VirtualMachine,
|
||||
convert::{ToPyException, ToPyObject},
|
||||
function::{FsPath, OptionalArg},
|
||||
},
|
||||
};
|
||||
use foreign_types_shared::ForeignTypeRef;
|
||||
use openssl::{
|
||||
asn1::Asn1ObjectRef,
|
||||
x509::{self, X509, X509Ref},
|
||||
};
|
||||
use openssl_sys as sys;
|
||||
use std::fmt;
|
||||
|
||||
// Import constants and error converter from _ssl module
|
||||
use crate::ssl::_ssl::{ENCODING_DER, ENCODING_PEM, convert_openssl_error};
|
||||
|
||||
pub(crate) fn obj2txt(obj: &Asn1ObjectRef, no_name: bool) -> Option<String> {
|
||||
let no_name = i32::from(no_name);
|
||||
let ptr = obj.as_ptr();
|
||||
let b = unsafe {
|
||||
let buflen = sys::OBJ_obj2txt(std::ptr::null_mut(), 0, ptr, no_name);
|
||||
assert!(buflen >= 0);
|
||||
if buflen == 0 {
|
||||
return None;
|
||||
}
|
||||
let buflen = buflen as usize;
|
||||
let mut buf = Vec::<u8>::with_capacity(buflen + 1);
|
||||
let ret = sys::OBJ_obj2txt(
|
||||
buf.as_mut_ptr() as *mut libc::c_char,
|
||||
buf.capacity() as _,
|
||||
ptr,
|
||||
no_name,
|
||||
);
|
||||
assert!(ret >= 0);
|
||||
// SAFETY: OBJ_obj2txt initialized the buffer successfully
|
||||
buf.set_len(buflen);
|
||||
buf
|
||||
};
|
||||
let s = String::from_utf8(b)
|
||||
.unwrap_or_else(|e| String::from_utf8_lossy(e.as_bytes()).into_owned());
|
||||
Some(s)
|
||||
}
|
||||
|
||||
#[pyattr]
|
||||
#[pyclass(module = "ssl", name = "Certificate")]
|
||||
#[derive(PyPayload)]
|
||||
pub(crate) struct PySSLCertificate {
|
||||
cert: X509,
|
||||
}
|
||||
|
||||
impl fmt::Debug for PySSLCertificate {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("Certificate")
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
impl PySSLCertificate {
|
||||
#[pymethod]
|
||||
fn public_bytes(
|
||||
&self,
|
||||
format: OptionalArg<i32>,
|
||||
vm: &VirtualMachine,
|
||||
) -> PyResult<PyObjectRef> {
|
||||
let format = format.unwrap_or(ENCODING_PEM);
|
||||
|
||||
match format {
|
||||
x if x == ENCODING_DER => {
|
||||
// DER encoding
|
||||
let der = self
|
||||
.cert
|
||||
.to_der()
|
||||
.map_err(|e| convert_openssl_error(vm, e))?;
|
||||
Ok(vm.ctx.new_bytes(der).into())
|
||||
}
|
||||
x if x == ENCODING_PEM => {
|
||||
// PEM encoding
|
||||
let pem = self
|
||||
.cert
|
||||
.to_pem()
|
||||
.map_err(|e| convert_openssl_error(vm, e))?;
|
||||
Ok(vm.ctx.new_bytes(pem).into())
|
||||
}
|
||||
_ => Err(vm.new_value_error("Unsupported format".to_owned())),
|
||||
}
|
||||
}
|
||||
|
||||
#[pymethod]
|
||||
fn get_info(&self, vm: &VirtualMachine) -> PyResult {
|
||||
cert_to_dict(vm, &self.cert)
|
||||
}
|
||||
}
|
||||
|
||||
fn name_to_py(vm: &VirtualMachine, name: &x509::X509NameRef) -> PyResult {
|
||||
let list = name
|
||||
.entries()
|
||||
.map(|entry| {
|
||||
let txt = obj2txt(entry.object(), false).to_pyobject(vm);
|
||||
let asn1_str = entry.data();
|
||||
let data_bytes = asn1_str.as_slice();
|
||||
let data = match std::str::from_utf8(data_bytes) {
|
||||
Ok(s) => vm.ctx.new_str(s.to_owned()),
|
||||
Err(_) => vm
|
||||
.ctx
|
||||
.new_str(String::from_utf8_lossy(data_bytes).into_owned()),
|
||||
};
|
||||
Ok(vm.new_tuple(((txt, data),)).into())
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
Ok(vm.ctx.new_tuple(list).into())
|
||||
}
|
||||
|
||||
// Helper to convert X509 to dict (for getpeercert with binary=False)
|
||||
fn cert_to_dict(vm: &VirtualMachine, cert: &X509Ref) -> PyResult {
|
||||
let dict = vm.ctx.new_dict();
|
||||
|
||||
dict.set_item("subject", name_to_py(vm, cert.subject_name())?, vm)?;
|
||||
dict.set_item("issuer", name_to_py(vm, cert.issuer_name())?, vm)?;
|
||||
// X.509 version: OpenSSL uses 0-based (0=v1, 1=v2, 2=v3) but Python uses 1-based (1=v1, 2=v2, 3=v3)
|
||||
dict.set_item("version", vm.new_pyobj(cert.version() + 1), vm)?;
|
||||
|
||||
let serial_num = cert
|
||||
.serial_number()
|
||||
.to_bn()
|
||||
.and_then(|bn| bn.to_hex_str())
|
||||
.map_err(|e| convert_openssl_error(vm, e))?;
|
||||
dict.set_item(
|
||||
"serialNumber",
|
||||
vm.ctx.new_str(serial_num.to_owned()).into(),
|
||||
vm,
|
||||
)?;
|
||||
|
||||
dict.set_item(
|
||||
"notBefore",
|
||||
vm.ctx.new_str(cert.not_before().to_string()).into(),
|
||||
vm,
|
||||
)?;
|
||||
dict.set_item(
|
||||
"notAfter",
|
||||
vm.ctx.new_str(cert.not_after().to_string()).into(),
|
||||
vm,
|
||||
)?;
|
||||
|
||||
if let Some(names) = cert.subject_alt_names() {
|
||||
let san: Vec<PyObjectRef> = names
|
||||
.iter()
|
||||
.map(|gen_name| {
|
||||
if let Some(email) = gen_name.email() {
|
||||
vm.new_tuple((ascii!("email"), email)).into()
|
||||
} else if let Some(dnsname) = gen_name.dnsname() {
|
||||
vm.new_tuple((ascii!("DNS"), dnsname)).into()
|
||||
} else if let Some(ip) = gen_name.ipaddress() {
|
||||
// Parse IP address properly (IPv4 or IPv6)
|
||||
let ip_str = if ip.len() == 4 {
|
||||
// IPv4
|
||||
format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3])
|
||||
} else if ip.len() == 16 {
|
||||
// IPv6 - format like: "X:X:X:X:X:X:X:X" (not compressed)
|
||||
format!(
|
||||
"{:X}:{:X}:{:X}:{:X}:{:X}:{:X}:{:X}:{:X}",
|
||||
(ip[0] as u16) << 8 | ip[1] as u16,
|
||||
(ip[2] as u16) << 8 | ip[3] as u16,
|
||||
(ip[4] as u16) << 8 | ip[5] as u16,
|
||||
(ip[6] as u16) << 8 | ip[7] as u16,
|
||||
(ip[8] as u16) << 8 | ip[9] as u16,
|
||||
(ip[10] as u16) << 8 | ip[11] as u16,
|
||||
(ip[12] as u16) << 8 | ip[13] as u16,
|
||||
(ip[14] as u16) << 8 | ip[15] as u16
|
||||
)
|
||||
} else {
|
||||
// Fallback for unexpected length
|
||||
String::from_utf8_lossy(ip).into_owned()
|
||||
};
|
||||
vm.new_tuple((ascii!("IP Address"), ip_str)).into()
|
||||
} else if let Some(uri) = gen_name.uri() {
|
||||
vm.new_tuple((ascii!("URI"), uri)).into()
|
||||
} else {
|
||||
// Handle DirName, Registered ID, and othername
|
||||
// Check if this is a directory name
|
||||
if let Some(dirname) = gen_name.directory_name()
|
||||
&& let Ok(py_name) = name_to_py(vm, dirname)
|
||||
{
|
||||
return vm.new_tuple((ascii!("DirName"), py_name)).into();
|
||||
}
|
||||
|
||||
// TODO: Handle Registered ID (GEN_RID)
|
||||
// CPython implementation uses i2t_ASN1_OBJECT to convert OID
|
||||
// This requires accessing GENERAL_NAME union which is complex in Rust
|
||||
// For now, we return <unsupported> for unhandled types
|
||||
|
||||
// For othername and other unsupported types
|
||||
vm.new_tuple((ascii!("othername"), ascii!("<unsupported>")))
|
||||
.into()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
dict.set_item("subjectAltName", vm.ctx.new_tuple(san).into(), vm)?;
|
||||
};
|
||||
|
||||
Ok(dict.into())
|
||||
}
|
||||
|
||||
// Helper to create Certificate object from X509
|
||||
pub(crate) fn cert_to_certificate(vm: &VirtualMachine, cert: X509) -> PyResult {
|
||||
Ok(PySSLCertificate { cert }.into_ref(&vm.ctx).into())
|
||||
}
|
||||
|
||||
// For getpeercert() - returns bytes or dict depending on binary flag
|
||||
pub(crate) fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult {
|
||||
if binary {
|
||||
let b = cert.to_der().map_err(|e| convert_openssl_error(vm, e))?;
|
||||
Ok(vm.ctx.new_bytes(b).into())
|
||||
} else {
|
||||
cert_to_dict(vm, cert)
|
||||
}
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
pub(crate) fn _test_decode_cert(path: FsPath, vm: &VirtualMachine) -> PyResult {
|
||||
let path = path.to_path_buf(vm)?;
|
||||
let pem = std::fs::read(path).map_err(|e| e.to_pyexception(vm))?;
|
||||
let x509 = X509::from_pem(&pem).map_err(|e| convert_openssl_error(vm, e))?;
|
||||
cert_to_py(vm, &x509, false)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user