Add zoneinfo from Python 3.12.6 (#5400)

This commit is contained in:
Ricardo Robles
2024-09-19 04:18:48 +02:00
committed by GitHub
parent 23f7cbf1c3
commit b5c1fd95dc
15 changed files with 4363 additions and 0 deletions

View File

@@ -0,0 +1,111 @@
from enum import Enum
import functools
import unittest
__all__ = [
"given",
"example",
"assume",
"reject",
"register_random",
"strategies",
"HealthCheck",
"settings",
"Verbosity",
]
from . import strategies
def given(*_args, **_kwargs):
def decorator(f):
if examples := getattr(f, "_examples", []):
@functools.wraps(f)
def test_function(self):
for example_args, example_kwargs in examples:
with self.subTest(*example_args, **example_kwargs):
f(self, *example_args, **example_kwargs)
else:
# If we have found no examples, we must skip the test. If @example
# is applied after @given, it will re-wrap the test to remove the
# skip decorator.
test_function = unittest.skip(
"Hypothesis required for property test with no " +
"specified examples"
)(f)
test_function._given = True
return test_function
return decorator
def example(*args, **kwargs):
if bool(args) == bool(kwargs):
raise ValueError("Must specify exactly one of *args or **kwargs")
def decorator(f):
base_func = getattr(f, "__wrapped__", f)
if not hasattr(base_func, "_examples"):
base_func._examples = []
base_func._examples.append((args, kwargs))
if getattr(f, "_given", False):
# If the given decorator is below all the example decorators,
# it would be erroneously skipped, so we need to re-wrap the new
# base function.
f = given()(base_func)
return f
return decorator
def assume(condition):
if not condition:
raise unittest.SkipTest("Unsatisfied assumption")
return True
def reject():
assume(False)
def register_random(*args, **kwargs):
pass # pragma: no cover
def settings(*args, **kwargs):
return lambda f: f # pragma: nocover
class HealthCheck(Enum):
data_too_large = 1
filter_too_much = 2
too_slow = 3
return_value = 5
large_base_example = 7
not_a_test_method = 8
@classmethod
def all(cls):
return list(cls)
class Verbosity(Enum):
quiet = 0
normal = 1
verbose = 2
debug = 3
class Phase(Enum):
explicit = 0
reuse = 1
generate = 2
target = 3
shrink = 4
explain = 5

View File

@@ -0,0 +1,43 @@
# Stub out only the subset of the interface that we actually use in our tests.
class StubClass:
def __init__(self, *args, **kwargs):
self.__stub_args = args
self.__stub_kwargs = kwargs
self.__repr = None
def _with_repr(self, new_repr):
new_obj = self.__class__(*self.__stub_args, **self.__stub_kwargs)
new_obj.__repr = new_repr
return new_obj
def __repr__(self):
if self.__repr is not None:
return self.__repr
argstr = ", ".join(self.__stub_args)
kwargstr = ", ".join(f"{kw}={val}" for kw, val in self.__stub_kwargs.items())
in_parens = argstr
if kwargstr:
in_parens += ", " + kwargstr
return f"{self.__class__.__qualname__}({in_parens})"
def stub_factory(klass, name, *, with_repr=None, _seen={}):
if (klass, name) not in _seen:
class Stub(klass):
def __init__(self, *args, **kwargs):
super().__init__()
self.__stub_args = args
self.__stub_kwargs = kwargs
Stub.__name__ = name
Stub.__qualname__ = name
if with_repr is not None:
Stub._repr = None
_seen.setdefault((klass, name, with_repr), Stub)
return _seen[(klass, name, with_repr)]

View File

@@ -0,0 +1,91 @@
import functools
from ._helpers import StubClass, stub_factory
class StubStrategy(StubClass):
def __make_trailing_repr(self, transformation_name, func):
func_name = func.__name__ or repr(func)
return f"{self!r}.{transformation_name}({func_name})"
def map(self, pack):
return self._with_repr(self.__make_trailing_repr("map", pack))
def flatmap(self, expand):
return self._with_repr(self.__make_trailing_repr("flatmap", expand))
def filter(self, condition):
return self._with_repr(self.__make_trailing_repr("filter", condition))
def __or__(self, other):
new_repr = f"one_of({self!r}, {other!r})"
return self._with_repr(new_repr)
_STRATEGIES = {
"binary",
"booleans",
"builds",
"characters",
"complex_numbers",
"composite",
"data",
"dates",
"datetimes",
"decimals",
"deferred",
"dictionaries",
"emails",
"fixed_dictionaries",
"floats",
"fractions",
"from_regex",
"from_type",
"frozensets",
"functions",
"integers",
"iterables",
"just",
"lists",
"none",
"nothing",
"one_of",
"permutations",
"random_module",
"randoms",
"recursive",
"register_type_strategy",
"runner",
"sampled_from",
"sets",
"shared",
"slices",
"timedeltas",
"times",
"text",
"tuples",
"uuids",
}
__all__ = sorted(_STRATEGIES)
def composite(f):
strategy = stub_factory(StubStrategy, f.__name__)
@functools.wraps(f)
def inner(*args, **kwargs):
return strategy(*args, **kwargs)
return inner
def __getattr__(name):
if name not in _STRATEGIES:
raise AttributeError(f"Unknown attribute {name}")
return stub_factory(StubStrategy, f"hypothesis.strategies.{name}")
def __dir__():
return __all__

45
Lib/test/support/hypothesis_helper.py vendored Normal file
View File

@@ -0,0 +1,45 @@
import os
try:
import hypothesis
except ImportError:
from . import _hypothesis_stubs as hypothesis
else:
# Regrtest changes to use a tempdir as the working directory, so we have
# to tell Hypothesis to use the original in order to persist the database.
from .os_helper import SAVEDCWD
from hypothesis.configuration import set_hypothesis_home_dir
set_hypothesis_home_dir(os.path.join(SAVEDCWD, ".hypothesis"))
# When using the real Hypothesis, we'll configure it to ignore occasional
# slow tests (avoiding flakiness from random VM slowness in CI).
hypothesis.settings.register_profile(
"slow-is-ok",
deadline=None,
suppress_health_check=[
hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.differing_executors,
],
)
hypothesis.settings.load_profile("slow-is-ok")
# For local development, we'll write to the default on-local-disk database
# of failing examples, and also use a pull-through cache to automatically
# replay any failing examples discovered in CI. For details on how this
# works, see https://hypothesis.readthedocs.io/en/latest/database.html
if "CI" not in os.environ:
from hypothesis.database import (
GitHubArtifactDatabase,
MultiplexedDatabase,
ReadOnlyDatabase,
)
hypothesis.settings.register_profile(
"cpython-local-dev",
database=MultiplexedDatabase(
hypothesis.settings.default.database,
ReadOnlyDatabase(GitHubArtifactDatabase("python", "cpython")),
),
)
hypothesis.settings.load_profile("cpython-local-dev")

5
Lib/test/test_zoneinfo/__init__.py vendored Normal file
View File

@@ -0,0 +1,5 @@
import os
from test.support import load_package_tests
def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)

3
Lib/test/test_zoneinfo/__main__.py vendored Normal file
View File

@@ -0,0 +1,3 @@
import unittest
unittest.main('test.test_zoneinfo')

100
Lib/test/test_zoneinfo/_support.py vendored Normal file
View File

@@ -0,0 +1,100 @@
import contextlib
import functools
import sys
import threading
import unittest
from test.support.import_helper import import_fresh_module
OS_ENV_LOCK = threading.Lock()
TZPATH_LOCK = threading.Lock()
TZPATH_TEST_LOCK = threading.Lock()
def call_once(f):
"""Decorator that ensures a function is only ever called once."""
lock = threading.Lock()
cached = functools.lru_cache(None)(f)
@functools.wraps(f)
def inner():
with lock:
return cached()
return inner
@call_once
def get_modules():
"""Retrieve two copies of zoneinfo: pure Python and C accelerated.
Because this function manipulates the import system in a way that might
be fragile or do unexpected things if it is run many times, it uses a
`call_once` decorator to ensure that this is only ever called exactly
one time — in other words, when using this function you will only ever
get one copy of each module rather than a fresh import each time.
"""
import zoneinfo as c_module
py_module = import_fresh_module("zoneinfo", blocked=["_zoneinfo"])
return py_module, c_module
@contextlib.contextmanager
def set_zoneinfo_module(module):
"""Make sure sys.modules["zoneinfo"] refers to `module`.
This is necessary because `pickle` will refuse to serialize
an type calling itself `zoneinfo.ZoneInfo` unless `zoneinfo.ZoneInfo`
refers to the same object.
"""
NOT_PRESENT = object()
old_zoneinfo = sys.modules.get("zoneinfo", NOT_PRESENT)
sys.modules["zoneinfo"] = module
yield
if old_zoneinfo is not NOT_PRESENT:
sys.modules["zoneinfo"] = old_zoneinfo
else: # pragma: nocover
sys.modules.pop("zoneinfo")
class ZoneInfoTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.klass = cls.module.ZoneInfo
super().setUpClass()
@contextlib.contextmanager
def tzpath_context(self, tzpath, block_tzdata=True, lock=TZPATH_LOCK):
def pop_tzdata_modules():
tzdata_modules = {}
for modname in list(sys.modules):
if modname.split(".", 1)[0] != "tzdata": # pragma: nocover
continue
tzdata_modules[modname] = sys.modules.pop(modname)
return tzdata_modules
with lock:
if block_tzdata:
# In order to fully exclude tzdata from the path, we need to
# clear the sys.modules cache of all its contents — setting the
# root package to None is not enough to block direct access of
# already-imported submodules (though it will prevent new
# imports of submodules).
tzdata_modules = pop_tzdata_modules()
sys.modules["tzdata"] = None
old_path = self.module.TZPATH
try:
self.module.reset_tzpath(tzpath)
yield
finally:
if block_tzdata:
sys.modules.pop("tzdata")
for modname, module in tzdata_modules.items():
sys.modules[modname] = module
self.module.reset_tzpath(old_path)

View File

View File

@@ -0,0 +1,190 @@
{
"data": {
"Africa/Abidjan": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j-~f{VGF<>F7KxBg5R*{Ksocg8-YYVul=v7vZzaHN",
"uC=da5UI2rH18c!OnjV{y4u(+A!!VBKmY&$ORw>7UO^(500B;v0RR91bXh%WvBYQl0ssI2",
"00dcD"
],
"Africa/Casablanca": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0b&Kz+C_;7KxBg5R*{N&yjMUR~;C-fDaSOU;q-~",
"FqW+4{YBjbcw}`a!dW>b)R2-0a+uwf`P3{_Y@HuCz}S$J$ZJ>R_V<~|Fk>sgX4=%0vUrh-",
"lt@YP^Wrus;j?`Th#xRPzf<<~Hp4DH^gZX>d{+WOp~HNu8!{uWu}&XphAd{j1;rB4|9?R!",
"pqruAFUMt8#*WcrVS{;kLlY(cJRV$w?d2car%R<ALOSO?^`4;ZZtI)%f^^G^>s>q9BgTU4",
"Ht-tQKZ7Z`9QqOb?R#b%z?rk>!CkH7jy3wja4NG2q)H}fNRKg8v{);Em;K3Cncf4C6&Oaj",
"V+DbX%o4+)CV3+e!Lm6dutu(0BQpH1T?W(~cQtKV*^_Pdx!LirjpTs?Bmt@vktjLq4;)O!",
"rrly=c*rwTwMJFd0I57`hgkc?=nyI4RZf9W$6DCWugmf&)wk^tWH17owj=#PGH7Xv-?9$j",
"njwDlkOE+BFNR9YXEmBpO;rqEw=e2IR-8^(W;8ma?M3JVd($2T>IW+0tk|Gm8>ftukRQ9J",
"8k3brzqMnVyjsLI-CKneFa)Lxvp_a<CkQEd#(pMA^rr}rBNElGA=*!M)puBdoErR9{kWL@",
"w=svMc6eZ^-(vQZrV<u^PY#nOIUDJ8%A&;BUVlY9=;@i2j2J1_`P>q40f}0J3VVoWL5rox",
"`Kptivcp}o5xA^@>qNI%?zo=Yj4AMV?kbAA)j(1%)+Pp)bSn+7Yk`M{oE}L-Z!G6<Dgq&*",
"(C-mFJfbEGDH5M^vBr65rcnsx*~|Em_GeU#B)(+T!|MG-nxj0@IPbp-nHejH3~>OMr5G+h",
"p)$3Lg{ono{4cN>Vr&>L4kXH;_VnBL5U!LgzqE%P7QQ*<E!guRW2SE@ayq@)G2nXqA2tGo",
"QIgc6>tue}O`3(TZ0`aKn&~8trOQ-rBXCp)f@P6RMO4l0+;b|5-pk9_ryNh}Zc*v%mvz_#",
"yd<xXt%~gT90dn4e{Ac<baL-)Y{L7&5G($I$>6fjB0g9{MmMnu8bG%#C~ugXK^S^k@?ab#",
"O|aE>dDTt4s4n69(~@t~!wniV%g<uWQat_i6>7khFx~I*4>Y|V$4j5%KPF*-FyKIi@!Ho&",
"x8QQsksYt8)D+W)Ni!=G`ogSu^vLL-l#7A7=iIAKL2SuZk9F}NfNk86VI)9WZE?%2wC-ya",
"F~z#Qsq)LH0|_D8^5fU8X%GeQ4TB>R-dlziA&tZe&1ada208!$nk`7bOFO2S00G<w{Sp8G",
"{cR_IvBYQl0ssI200dcD"
],
"America/Los_Angeles": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0qH3OkDsf7KxBg5R*;z{h&-RlhRYu$%jt%!jv+I",
"JxhE=%W1?wYb!37Rb?(rgwFIAQI{L#8r*zy!$TMtER_1(vn(Zix^{AVB1(jwr$iL6h0Z!2",
"8Gb~UW@0~e512{Z%8}Qzdnjl~wJ1{c2>`Z@1A~t&lyL{p{eM{5)QGf7Mo5FW9==mlyXJt2",
"UwpntR7H0eSq!(aYq#aqUz&RM*tvuMI)AsM?K3-dV3-TT{t)!Iy#JTo=tXkzAM9~j2YbiO",
"ls3(H8Dc>Y|D1aqL51vjLbpYG;GvGTQB4bXuJ%mA;(B4eUpu$$@zv2vVcq-Y)VKbzp^tei",
"uzy}R{Luv<C;_cPe*n$Z<jeC9ogWF9=1mvvUYXS>DjpuVb`79O+CBmg{Wx!bvx$eu4zRE&",
"PehMb=&G<9$>iZ|bFE)0=4I?KLFGBC0I(0_svgw0%FiMsT%koo*!nEYc6GY@QnU}&4Isg;",
"l=|khi(!VaiSE2=Ny`&&tpi~~;{$u<GHlsr3Ze!iYsU205RFKsLnrXwOL?Mq08xffgS{6h",
"E|figx+&N%wbO}re@|}$l;g_6J-Wl%j|qev8A<T?NJ)`;2neGi_DHE4ET*W!c*ggPAgU+L",
"E9=bH7;maCUikw^R)UM;TdVvNkQ;FGgN=yQER`SZ1nOgPXr0LCebLety&}kVdmVmB=8eSg",
"td!1%p=a2wooIL!Da}OPXvKBfRo?YxqS>N}%f|7mBhAy;<Er2&_LfND#qXN~Mkgf!@4VFA",
"Hr%$c)wrKA2cJYWK2>s3YT^sy!$eG~?`9mNJC9@4Bac_p^BZh)Yd_rWW5qh-?tKY(>5VHO",
"L*iT8P@wCavLj^yYbnDR+4ukhS+xPrpl)iqB?u)bj9a2aW==g6G3lCJd>(+Blf<d4CF%7u",
"tlBUDki}J-!_Dy}5S(MrxSXy~$Z+hgH3P^<<w7D72L7I-R%H3(xm&q_DXxkp$owLTS6Wzk",
"hc3nn;laROa3)6hl&gH#)2Lif8fZe$@CdeJ-Zn&*>r)~^40F4f>cRZ^UF;RibfZ>0m73hR",
"C{$vTfC(STN`g7(B<=Z2556{}0`?p&|Akkst!4Xy4OT;A@c$XTUI3FRRjy*KA7uC56FD)z",
"^X{WV*sr(w!c$W357o!&eLO2wTDNOyw@gf(&R<<LF_3URI4=Ei`-%dM3T66j#9!aG7&b_@",
"g1-9vo?DzXZ5vGaf~w__p_@_X?OdvQ_r5bvy2hpESTf+{p?jL+!~!{g8-<-5$@d8EZV&-5",
"@a|;^1gB*R-~{EHFA-td_G2bt;~Y}>t;=-Tu1TV{>%8ZVATC9tjD8|(&`$9YHvZ9bVe#>w",
"|8c;Tg|xE&)`*}LwM*E}q}q8^Qja%p`_U)*5DdLI9O@!e=3jFjOCrCq28b_bb;s>%D#iJB",
"CWJi{JH!Js;6nfayos$kq^OEX00HO-lokL0!mqm{vBYQl0ssI200dcD"
],
"America/Santiago": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0fRZ<6QtM7KxBg84(fsEAUJ$J{f-TXlPEUec5Ee",
"n+hsD4lC(QYax=JdSpoyje8%VM`GW}<Unz6IOY4=y66tfqG2X4E8xIJQ(~?r{`L~T!sI~o",
"VBl7Ao!R1A76Y8P6Y<TfwVHf@sl@S-D4OuAy5mq0MKJZ>{bJ8@y$A8O&*$pw{(f~Os#}2w",
"eX6^Rgi$IT%n^V^85L>$_c7{cB^#ogV=rHBJGiz-RQNFGK?gdPi|q)j`&8)}KJ{qo6dixa",
"9@yYyVg+%lo0nO+Tw0-w2hJ%mafy<Co(;L+24CYl&?rN0mrh90nxG?%1&Ed@za`Yd>WL)|",
")<o0dZL-*?RFtH7dAv%G*O%l?qvq!0F5C?K#_ZoT{P$77IMoj3&8w3f&n36zquu~s`s0T)",
";>?W6Bi%FWuGPA1Dru$XR4SZANsAthU2EoKH<MU4wYvUTlZGcLIDR+hSik>F6oEtKq`rwP",
"(VNegnI_NI%;ma$)wj{k!@KFB30Yo)IOr<QX7IQ@TBq9d;e3QAtYU?$PS-WoaiqwFrg4PR",
"A->l>)$)D|+(5h&+%2vuwGuy^@S8FT^s21V5};>VA9Iu;?8bHz#r<;JtfZDI1(FT@edh0#",
"MYW$A1qkMGIwTZqqdYNE3gl#zp&NbL9Mp=voqN|;?gqR&4$)1`znddtEyuKS*^nMMD=0^>",
"7^z6-C4P67UWOXuMBubP>j6i~03aR@jD^-Y`JSYu#Yp0P8dLLJ0QOPE8=BoiuRX59YW7xg",
"WiexjHX%&0?`ZQCdxCdL^qd1v@kOjQKaWo2Y1++~LcA%FTq?5o<?(jL(_Uo}I}k_Fwflcr",
"aovwSR_(ILA6li<iBLPQ0#rEet;W-*54kj#sZEGK*tAF{)HNkn#&Hc5`#eaRF;N#$<xQU?",
"E%zm?2+b5Ho>%}fX1-RIvlB)1#iTNomGnUL=nM!>Ix|AGtON7!F1O?53kqlC2o-`ZGw*+s",
"NM$^9znsIJMwlgscE`|O3|;BRgsQMYm~`uv+nvuv`nigRa}X=BX=A5Sw$)WEklF7&c>_~$",
"zJ(m--bqXgiN^w-U=BJH9C0Qro(x90zo@rK;&TJ$nI@&k$ORgOb2<MjjIhYfr;pFUGdMd!",
"0d&bOvyq3AZPCez8E(XSg2hBu2A&^k?w|1u8v3JE>s%gWbc}ok_27)Eoku~Fq|B-Ps+4J_",
"HPJMLJ2^_)cOU$p&3kNAlrV!)%~6r$BJ>OOi~=-<6byle{?zd4J{NG}o8tw|+#ZNLcpNwk",
"TuPE~sbJB8_RZb2DopStO+Wwux~F#S59zm%00I98;S&G=b(j+6vBYQl0ssI200dcD"
],
"Asia/Tokyo": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j-~luMgIxeB7KxBg5R*;y?l4Rl4neXH3cv!OtfK@h",
"KZzauI)S!FSDREPhhBS6Fb$&Vv#7%;?Te|>pF^0HBr&z_Tk<%vMW_QqjevRZOp8XVFgP<8",
"TkT#`9H&0Ua;gT1#rZLV0HqbAKK;_z@nO;6t0L<i8TZ+%T<;ci2bYSG1u!mUSO5S3XcbN8",
"dIxbZ00Ex?wE_SDJu@vkvBYQl0ssI200dcD"
],
"Australia/Sydney": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0T)o7+nA=7KxBg5R*_t6jS5T`_Ull(nK1_YY;k%",
";_YdTuU3*!K)eKg@^kzjAtbo@Jd|KGai=Q%%sX5FI?*?LG!|m9cKH5~IEwI=PAr_Yc}w35",
">}hOdk<>TdUa07R(LPI6@!GU$ty4=mwqHG-XVe*n(Yvgdlr+FqIU18!osi)48t~eWX8)&L",
"G)Ud^0zz@*AF+2r7E}N<P$kOfo*88g)_bOO?7N1Jr|HJyg+HXc7f4}?%Dur3w|~JU?<x4K",
"%RRC~q_D87;UyN{nLRu!fEqKeRR*U$vs>f9Y72K~o-T%}D&z%}#7g<qim`EbfhF7ntyAiP",
"%LFNc&!$@Kv)Olyf&Y9%(#SkM+%yI}S%b+@ZM2dH7DpmndGMIda<(`#E9q|?H(HzClx+l;",
"M?IEz1eF}r?}ay!V9?9rKD^-ayjE@wUMD$2kC!iwH`n=eVrJPmJyNKaW`LdJ68&u;2nF1K",
"kZjKCY_A<>2br?oH6ZiYH^%>J3D)TPKV(JY*bwjuw5=DsPB@~CrR<E_U_fJTF9ufU%!cXK",
"_4uM#!%%Q1e1G~{E}~vGVE0{Kxecm^NjtJM`c8EFHFTiUIVl@YUD8F+s!u8jz~6hte@oa|",
"qayb*^Lwd(etNmBro;aXQjkY8g(*`_JQ0%{V3QP2l!GGQ7D+v&k_PK0F(?f{GziU5>OZeN",
"x>A*H&CHrWt0`EP`m!F%waepl#|w#&`XgVc?~2M3uw$fGX~tf_Il!q#Aa<*8xlzQ2+7r6Z",
"^;Laa9F(WB_O&Dy2r>~@kSi16W{=6+i5GV=Uq~KX*~&HUN4oz7*O(gXIr}sDVcD`Ikgw#|",
"50ssal8s)Qy;?YGCf;*UKKKN!T4!Kqy_G;7<gSrPK{)5#a>PfQapugqvVBKy12v3TVH^L2",
"0?#5*VP~MOYfe$h`*L!7@tiW|_^X1N%<}`7YahiUYtMu5XwmOf3?dr+@zXHwW`z}ZDqZlT",
"<2Cs(<1%M!i6o&VK89BY0J7HPIo;O62s=|IbV^@y$N&#<x=a876<(U>=>i^F00FcHoDl#3",
"Mdv&xvBYQl0ssI200dcD"
],
"Europe/Dublin": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0>b$_+0=h7KxBg5R*;&J77#T_U2R5sleVWFDmK~",
"Kzj5oh@`<njquRZ&tJIS(cXp1>QKHvW^6V{jU-w>qg1tSt0c^vh;?qAqA0%t?;#S~6U8Qi",
"v&f1s9IH#g$m1k1a#3+lylw4mwT4QnEUUQdwg+xnEcBlgu31bAVabn41OMZVLGz6NDwG%X",
"uQar!b>GI{qSahE`AG}$kRWbuI~JCt;38)Xwbb~Qggs55t+MAHIxgDxzTJ;2xXx99+qCy4",
"45kC#v_l8fx|G&jlVvaciR<-wwf22l%4(t@S6tnX39#_K(4S0fu$FUs$isu<UOJYm|4)2i",
"aEpsajn@}B#rnY=Cg_TXsm-A)*adXV&$klNTn3n{XXlaquu}6m{k%oRmY0Yyhlj*<W{D5m",
"22}OiqnwHT!tnK`wPqx?wiF%v{ipTrOkcJ5P@7OC4(-l`*&SB$Wd4Vf8gn?>d<i@%mP*e*",
"ttDj`9M1;9$YV@dhT)DVcwdq(Ly~KDm_&KL?{_mFwwYtJqRZBk)i1FVQy!40w_KyAg?hIA",
"=_{(3#S0eWsF8f%_4Zza$4@$lSmov+Huyn$vP^zJ|8-<C3#q#0kEs9cNg^xUR(m?wEWt-D",
"GctAh2nIo~fz%$m$I41=b_WuJ6M9g#A9_Epwqw{d0B|vzmg#_y<=_>9IKzCXB<o`d)**5V",
"6g!<<Jw1n5TrN-$)aYz4cLsTmpsUf-6L7ix+kk>78NkARYq@9Dc0TGkhz);NtM_SSzEffN",
"l{2^*CKGdp52h!52A)6q9fUSltXF{T*Ehc9Q7u8!W7pE(Fv$D$cKUAt6wY=DA1mGgxC*VX",
"q_If3G#FY6-Voj`fIKk`0}Cc72_SD{v>468LV{pyBI33^p0E?}RwDA6Pkq--C~0jF&Z@Pv",
"!dx_1SN_)jwz@P$(oK%P!Tk9?fRjK88yxhxlcFtTjjZ$DYssSsa#ufYrR+}}nKS+r384o~",
"!Uw$nwTbF~qgRsgr0N#d@KIinx%<pnyQ!|>hQB(SJyjJtDtIy(%mDm}ZBGN}dV6K~om|=U",
"VGkbciQ=^$_14|gT21!YQ)@y*Rd0i_lS6gtPBE9+ah%WIJPwzUTjIr+J1XckkmA!6WE16%",
"CVAl{Dn&-)=G$Bjh?bh0$Xt1UDcgXJjXzzojuw0>paV~?Sa`VN3FysqF<S*L0RYSAY3jt(",
"8wCD04RfyEcP(RNT%x7k(7m-9H3{zuQ`RZy-Rz%*&dldDVFF+TwSAPO1wRX^5W5@xJ9{vW",
"w?rc^NH({%Ie<rxKqSVy!Le-_`U&@W_(D+>xTzfKVAu*ucq#+m=|KSSMvp_#@-lwd+q*ue",
"FQ^5<D+|jLr?k{O39i8AX2Qb^zi9A<7XD1y!-W2|0Hk8JVkN;gl><|<0R-u4qYMbRqzSn&",
"Q7jSuvc%b+EZc%>nI(+&0Tl1Y>a6v4`uNFD-7$QrhHgS7Wnv~rDgfH;rQw3+m`LJxoM4v#",
"gK@?|B{RHJ*VxZgk#!p<_&-sjxOda0YaiJ1UnG41VPv(Et%ElzKRMcO$AfgU+Xnwg5p2_+",
"NrnZ1WfEj^fmHd^sx@%JWKkh#zaK0ox%rdP)zUmGZZnqmZ_9L=%6R8ibJH0bOT$AGhDo6{",
"fJ?;_U;D|^>5by2ul@i4Zf()InfFN}00EQ=q#FPL>RM>svBYQl0ssI200dcD"
],
"Europe/Lisbon": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0=rf*IfWA7KxBg5R*;*X|PN+G3LqthM?xgkNUN_",
")gCt1Sc%YT6^TTomk4yVHXeyvQj8}l<;q&s7K}#Vnc8lII1?)AHh$*>OKUU4S;*h>v*ep0",
"xTi1cK2{aY*|2D*-~K<;-{_W+r@NvZ7-|NZv($ek_C%VfP0xjWeZP#CPXD`IKkakjh(kUd",
"&H)m;^Q(jGjIyiyrcUMtOP)u3A>sw6ux;Bmp3x$4QvQKMx5TrCx_!$srWQuXNs&`9=^IY1",
"yc&C31!sQh7P=Mk*#6x8Z@5^%ehR8UW<EvzdWer9z;R6PrdUaWab3G>$OWw0KMw}P1ycI^",
"4eh12oBUOV?S>n*d!+EM@>x#9PZD12iD=zaC;7`8dTfkU_6d}OZvSFSbGgXeKw}XyX@D=(",
")D0!^DBGr8pXWBT$S-yhLP>Z3ys^VW<kSQr?{jhl<+{Fki;mTI=&Stgy$rttN?ulQM$lDr",
"G7))C7Dx=J6V-e^(Qk|r;f~TvIw1KqRIC{8f^jPy#blstV{-&2a}ZJe!Zr2c_R4NT)L@bs",
"+gRRm6Wn)VWVNHeK*TEV=f#2KZqu%y?mTx#EfRiK0)TG7$$~=LGxx@0D|lS2up|oCON{YQ",
"oN5-H$!_n-Kx2*=RO!epEX>3}RQ6{NGGVJG6vf*MH93vvNW6yLjie1;{4tVhg-KnSf|G`!",
"Z;j$7gJ1ows~RD=@n7I6aFd8rOR_7Y?E-$clI%1o5gA@O!KPa^(8^iFFeFykI-+z>E$mvp",
"E_h`vbHPjqkLs`Dn-0FV`R@z|h!S(Lb;M&|Exr<u8#s-T(>!biY`%bfp$6`hK;GDhdP|^Q",
"*Ty*}1d41K>H2B{jrjE9aFK>yAQJBX9CD%-384S;0fw`PlprHGS`^b$oS-`I4VH7ji8ou-",
"g|060jfb1XcxiInT0oO<S+<vh^)XY;lr@|IeXj}%k;}|kSlDGaYidk^zB|gEYaet~F%QYd",
"f7pbnQKLZ0o7=kso86doS;J@aQ>oeR7#%e5Ug5#KW)nV<Rc;|LjUDdhk8*dYJQwYN?hzH%",
"0<XB$!(rpf2nxaL22M`L4pKx>SRvLHNe$SQHM@2)`S9L7>RL@<XAlxVQfb2=%lcu!h+Um0",
"Q+Z=itevTFy}-Jl<g5crK55BF`VsoPH~qP3QrG%YtrD#s{=gA7p)QI<i=EwY(cel8`B=#u",
"Yq<K;4T(QBF_GvrYueSk*}gfrCSg22+YH-1N<WYkp|DA-P-&va<Xu<}^yafJKlzezB-lS{",
"a++P_^gYmgrc9FO-K3s~`jAcqVV!k?NV2IFV^86`cr>Qx%fmm7?3u7P5TywFQ}C@S(pq}|",
"eLPT{C^{<0Q?uU&kSVd%!~8q3;Z0s3OqzF`$HRkePL5Ywgiwn{R(<RY8ut&RJ;$?J*w*n)",
">zi+jmOBFrVpW;)@UsU#%$8BcV#h@}m$#!Fglo&bwb78aYqOG_W7h{eb(+39&-mk4EIXq_",
"_`30=8sfA3=!3TO_TyS5X22~?6nKngZ|bq=grdq=9X)3xAkA42L!~rmS)n3w-~;lgz%Fhn",
"(?rXdp2ho~9?wmVs2JwVt~?@FVD%`tN69{(i3oQa;O0<Hp#T5?$WIy3h`IlL00Hv}jT-;}",
"Z2tpNvBYQl0ssI200dcD"
],
"Europe/London": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j;0`|pJ6!-O7KxBg5R*;$9DqzW!kQs3DZt(=0_!m1",
"4wvE`6N%Vj#u6PS_3S?~(2)&xn8}2}3Wr#kG8n2!x8>$E$lF&~Y#_H6bu6(BiwblJ>;-Fs",
"gA$Y$*?=X)n1pFkKn}F~`>=4)+LLQk?L*P!bhAm0;`N~z3QbUIyVrm%kOZ(n1JJsm0pyb8",
"!GV{d*C!9KXv;4v<seWRpo=ZZxGf)-5Qsn$3dw`uhF)+6#mgUoNF-Y2jN73pVhdTs*p0`Z",
"AbnT1puEtudB{Nul>D4Q>-k#+x(!V5L@w5M>v2V5<gcLskF+p`aGTSn{sY8^@MUc;2o{&V",
"R!$180N}BtfYKS)i9w=!<~&l?1Cv^PWs!&a9{s(35^yqGU$72DKX|IkRtDblB>a`B>t(|B",
"|Fqr4^-{S*%Ep~ojUtx_CRbSQ(uFwu2=KH)Q@EBs@ZqRXn4mU;B!68;;IQs3Ub=n&UU%*m",
"k&zwD36&JSwsN(%k&x?H+tN^6)23c`I0=5^N_R0~1>tsFZ`^`3z~rXSXT&qcwa#n!%+Z#P",
"PG}(D^_CCILXnF|GKwabBh*xFS?4rwGo2vtJUwzrbv_$5PO+`?$l{H-jGB@X%S!OAhw;D4",
"XFycN3!XqQ&EorJOD3>~^U%Luw!jF<;6_q-f-S|6<EHry?%{@fuyH`_+D%uTA@g0$5e!Yi",
"P1vQuevyS;jE(-R>{cQDfZ2(4Xf1MMLr1=SA=MwVf2%Pp%VP;jn)|5Tf!-DbUGn%I-r<KG",
"4jJ(Y#L-fJUpUb$yNfvhX*iqWZoG7T*WUfE6iQD9_^EWqExH`rc&jJ<o^E8-mM10WrZ_Vv",
"xx9nj<vMlEt*KfP*pyth!c_AKnrKtQTACX08#{pioAFnDq!53+h*hO^f*yrWjg0u2pUcgv",
"UlpEZ9G_dlhlW1J^h@gTt7{KPL2mRal;1juJ3Q8-!GXO#IPzT4ciJ-nB+nkphssM}Q7IAT",
"pM}AT%y(J!78F?>kYaH7?$$O!t)wwClAisr3eUoeB^~T=U*_P~Y2*KdnO87>B!19sV=xZ5",
"yApq26RxgqA|*tmsvtL#OhcF(C<0EGWHP)BF<g*iSWicU6k1<Ps?BQ$IWg-#s2uF-qXgJ_",
"!H_mZIMx*L%&a*_6;_trMCULk0ZYM<hfJlYBddHwRyYUDu3!C_lJZWTQ?c-R&@9054pj0k",
"kQ{Xi{A$&)&b#^G*}8w^qE5i<@aDxaJQs2E$W)AIqUXO{gQ;U8|FA%BD~sORzq44)AntUu",
"QHBO{{Pi<EpK!$x4(~7w)la!dN=M@L_j};6|5G&QfuO~2?Q7996z)78fqW<D#8tKNV(*qc",
"mfA>l?h)_*7!{LoJiv%RsOs!q->n+DcV%9~B@Rb<ISu%16c5H-7zQIq+SuS+s<lQOWK5+C",
"d*>C_1G_1g6`Yd~8|%-=2l~oGN!~TVv2Bnk>7wW8L@^?vX$f3AiT)(4nrCuTm9%(XC6Nai",
"E(;}7&=YZagjAN$O-cN;1u{dTkElmB0GT$|Wa)QMmKrx<|LCJ9qlUoFsUbD^H^6_8(w<0{",
"ftj&O1~p_%lh5z;zNV&sP<T$*OgK)_0B#JDtXOkhC;Bo7h)#RUy;vBiVLN-T$*7t*t9@ey",
"3Woa&24QZ_z38BQ@A(A<(9n@%R?}B`7%w2wowt~UU;bAlqCzr(H$M5t==jGIqMqCsE=Jwa",
"$3P+3^&|~i28@=d_u6Cgthe(Lq(wxKpdSDL|7X6Un<nrt00Gwuz#ISo`BbmvvBYQl0ssI2",
"00dcD"
],
"Pacific/Kiritimati": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j-~jCaVO;<!7KxBg5R*{K!`A|q%C5j6({{dSEy5>+",
"NF2>iK{8KMUf+)<-)VxXbLxD(alL}N$AT-ogNbJSMMYeX+Z{jS)b8TK^PB=FxyBxzfmFto",
"eo0R`a(%NO?#aEH9|?Cv00000NIsFh6BW2800DjO0RR918Pu^`vBYQl0ssI200dcD"
],
"UTC": [
"{Wp48S^xk9=GL@E0stWa761SMbT8$j-~e#|9bEt_7KxBg5R*|3h1|xhHLji!C57qW6L*|H",
"pEErm00000ygu;I+>V)?00B92fhY-(AGY&-0RR9100dcD"
]
},
"metadata": {
"version": "2020a"
}
}

2259
Lib/test/test_zoneinfo/test_zoneinfo.py vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,368 @@
import contextlib
import datetime
import os
import pickle
import unittest
import zoneinfo
from test.support.hypothesis_helper import hypothesis
import test.test_zoneinfo._support as test_support
ZoneInfoTestBase = test_support.ZoneInfoTestBase
py_zoneinfo, c_zoneinfo = test_support.get_modules()
UTC = datetime.timezone.utc
MIN_UTC = datetime.datetime.min.replace(tzinfo=UTC)
MAX_UTC = datetime.datetime.max.replace(tzinfo=UTC)
ZERO = datetime.timedelta(0)
def _valid_keys():
"""Get available time zones, including posix/ and right/ directories."""
from importlib import resources
available_zones = sorted(zoneinfo.available_timezones())
TZPATH = zoneinfo.TZPATH
def valid_key(key):
for root in TZPATH:
key_file = os.path.join(root, key)
if os.path.exists(key_file):
return True
components = key.split("/")
package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
resource_name = components[-1]
try:
return resources.files(package_name).joinpath(resource_name).is_file()
except ModuleNotFoundError:
return False
# This relies on the fact that dictionaries maintain insertion order — for
# shrinking purposes, it is preferable to start with the standard version,
# then move to the posix/ version, then to the right/ version.
out_zones = {"": available_zones}
for prefix in ["posix", "right"]:
prefix_out = []
for key in available_zones:
prefix_key = f"{prefix}/{key}"
if valid_key(prefix_key):
prefix_out.append(prefix_key)
out_zones[prefix] = prefix_out
output = []
for keys in out_zones.values():
output.extend(keys)
return output
VALID_KEYS = _valid_keys()
if not VALID_KEYS:
raise unittest.SkipTest("No time zone data available")
def valid_keys():
return hypothesis.strategies.sampled_from(VALID_KEYS)
KEY_EXAMPLES = [
"Africa/Abidjan",
"Africa/Casablanca",
"America/Los_Angeles",
"America/Santiago",
"Asia/Tokyo",
"Australia/Sydney",
"Europe/Dublin",
"Europe/Lisbon",
"Europe/London",
"Pacific/Kiritimati",
"UTC",
]
def add_key_examples(f):
for key in KEY_EXAMPLES:
f = hypothesis.example(key)(f)
return f
class ZoneInfoTest(ZoneInfoTestBase):
module = py_zoneinfo
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_str(self, key):
zi = self.klass(key)
self.assertEqual(str(zi), key)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_key(self, key):
zi = self.klass(key)
self.assertEqual(zi.key, key)
@hypothesis.given(
dt=hypothesis.strategies.one_of(
hypothesis.strategies.datetimes(), hypothesis.strategies.times()
)
)
@hypothesis.example(dt=datetime.datetime.min)
@hypothesis.example(dt=datetime.datetime.max)
@hypothesis.example(dt=datetime.datetime(1970, 1, 1))
@hypothesis.example(dt=datetime.datetime(2039, 1, 1))
@hypothesis.example(dt=datetime.time(0))
@hypothesis.example(dt=datetime.time(12, 0))
@hypothesis.example(dt=datetime.time(23, 59, 59, 999999))
def test_utc(self, dt):
zi = self.klass("UTC")
dt_zi = dt.replace(tzinfo=zi)
self.assertEqual(dt_zi.utcoffset(), ZERO)
self.assertEqual(dt_zi.dst(), ZERO)
self.assertEqual(dt_zi.tzname(), "UTC")
class CZoneInfoTest(ZoneInfoTest):
module = c_zoneinfo
class ZoneInfoPickleTest(ZoneInfoTestBase):
module = py_zoneinfo
def setUp(self):
with contextlib.ExitStack() as stack:
stack.enter_context(test_support.set_zoneinfo_module(self.module))
self.addCleanup(stack.pop_all().close)
super().setUp()
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_pickle_unpickle_cache(self, key):
zi = self.klass(key)
pkl_str = pickle.dumps(zi)
zi_rt = pickle.loads(pkl_str)
self.assertIs(zi, zi_rt)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_pickle_unpickle_no_cache(self, key):
zi = self.klass.no_cache(key)
pkl_str = pickle.dumps(zi)
zi_rt = pickle.loads(pkl_str)
self.assertIsNot(zi, zi_rt)
self.assertEqual(str(zi), str(zi_rt))
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_pickle_unpickle_cache_multiple_rounds(self, key):
"""Test that pickle/unpickle is idempotent."""
zi_0 = self.klass(key)
pkl_str_0 = pickle.dumps(zi_0)
zi_1 = pickle.loads(pkl_str_0)
pkl_str_1 = pickle.dumps(zi_1)
zi_2 = pickle.loads(pkl_str_1)
pkl_str_2 = pickle.dumps(zi_2)
self.assertEqual(pkl_str_0, pkl_str_1)
self.assertEqual(pkl_str_1, pkl_str_2)
self.assertIs(zi_0, zi_1)
self.assertIs(zi_0, zi_2)
self.assertIs(zi_1, zi_2)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_pickle_unpickle_no_cache_multiple_rounds(self, key):
"""Test that pickle/unpickle is idempotent."""
zi_cache = self.klass(key)
zi_0 = self.klass.no_cache(key)
pkl_str_0 = pickle.dumps(zi_0)
zi_1 = pickle.loads(pkl_str_0)
pkl_str_1 = pickle.dumps(zi_1)
zi_2 = pickle.loads(pkl_str_1)
pkl_str_2 = pickle.dumps(zi_2)
self.assertEqual(pkl_str_0, pkl_str_1)
self.assertEqual(pkl_str_1, pkl_str_2)
self.assertIsNot(zi_0, zi_1)
self.assertIsNot(zi_0, zi_2)
self.assertIsNot(zi_1, zi_2)
self.assertIsNot(zi_0, zi_cache)
self.assertIsNot(zi_1, zi_cache)
self.assertIsNot(zi_2, zi_cache)
class CZoneInfoPickleTest(ZoneInfoPickleTest):
module = c_zoneinfo
class ZoneInfoCacheTest(ZoneInfoTestBase):
module = py_zoneinfo
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_cache(self, key):
zi_0 = self.klass(key)
zi_1 = self.klass(key)
self.assertIs(zi_0, zi_1)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_no_cache(self, key):
zi_0 = self.klass.no_cache(key)
zi_1 = self.klass.no_cache(key)
self.assertIsNot(zi_0, zi_1)
class CZoneInfoCacheTest(ZoneInfoCacheTest):
klass = c_zoneinfo.ZoneInfo
class PythonCConsistencyTest(unittest.TestCase):
"""Tests that the C and Python versions do the same thing."""
def _is_ambiguous(self, dt):
return dt.replace(fold=not dt.fold).utcoffset() == dt.utcoffset()
@hypothesis.given(dt=hypothesis.strategies.datetimes(), key=valid_keys())
@hypothesis.example(dt=datetime.datetime.min, key="America/New_York")
@hypothesis.example(dt=datetime.datetime.max, key="America/New_York")
@hypothesis.example(dt=datetime.datetime(1970, 1, 1), key="America/New_York")
@hypothesis.example(dt=datetime.datetime(2020, 1, 1), key="Europe/Paris")
@hypothesis.example(dt=datetime.datetime(2020, 6, 1), key="Europe/Paris")
def test_same_str(self, dt, key):
py_dt = dt.replace(tzinfo=py_zoneinfo.ZoneInfo(key))
c_dt = dt.replace(tzinfo=c_zoneinfo.ZoneInfo(key))
self.assertEqual(str(py_dt), str(c_dt))
@hypothesis.given(dt=hypothesis.strategies.datetimes(), key=valid_keys())
@hypothesis.example(dt=datetime.datetime(1970, 1, 1), key="America/New_York")
@hypothesis.example(dt=datetime.datetime(2020, 2, 5), key="America/New_York")
@hypothesis.example(dt=datetime.datetime(2020, 8, 12), key="America/New_York")
@hypothesis.example(dt=datetime.datetime(2040, 1, 1), key="Africa/Casablanca")
@hypothesis.example(dt=datetime.datetime(1970, 1, 1), key="Europe/Paris")
@hypothesis.example(dt=datetime.datetime(2040, 1, 1), key="Europe/Paris")
@hypothesis.example(dt=datetime.datetime.min, key="Asia/Tokyo")
@hypothesis.example(dt=datetime.datetime.max, key="Asia/Tokyo")
def test_same_offsets_and_names(self, dt, key):
py_dt = dt.replace(tzinfo=py_zoneinfo.ZoneInfo(key))
c_dt = dt.replace(tzinfo=c_zoneinfo.ZoneInfo(key))
self.assertEqual(py_dt.tzname(), c_dt.tzname())
self.assertEqual(py_dt.utcoffset(), c_dt.utcoffset())
self.assertEqual(py_dt.dst(), c_dt.dst())
@hypothesis.given(
dt=hypothesis.strategies.datetimes(timezones=hypothesis.strategies.just(UTC)),
key=valid_keys(),
)
@hypothesis.example(dt=MIN_UTC, key="Asia/Tokyo")
@hypothesis.example(dt=MAX_UTC, key="Asia/Tokyo")
@hypothesis.example(dt=MIN_UTC, key="America/New_York")
@hypothesis.example(dt=MAX_UTC, key="America/New_York")
@hypothesis.example(
dt=datetime.datetime(2006, 10, 29, 5, 15, tzinfo=UTC),
key="America/New_York",
)
def test_same_from_utc(self, dt, key):
py_zi = py_zoneinfo.ZoneInfo(key)
c_zi = c_zoneinfo.ZoneInfo(key)
# Convert to UTC: This can overflow, but we just care about consistency
py_overflow_exc = None
c_overflow_exc = None
try:
py_dt = dt.astimezone(py_zi)
except OverflowError as e:
py_overflow_exc = e
try:
c_dt = dt.astimezone(c_zi)
except OverflowError as e:
c_overflow_exc = e
if (py_overflow_exc is not None) != (c_overflow_exc is not None):
raise py_overflow_exc or c_overflow_exc # pragma: nocover
if py_overflow_exc is not None:
return # Consistently raises the same exception
# PEP 495 says that an inter-zone comparison between ambiguous
# datetimes is always False.
if py_dt != c_dt:
self.assertEqual(
self._is_ambiguous(py_dt),
self._is_ambiguous(c_dt),
(py_dt, c_dt),
)
self.assertEqual(py_dt.tzname(), c_dt.tzname())
self.assertEqual(py_dt.utcoffset(), c_dt.utcoffset())
self.assertEqual(py_dt.dst(), c_dt.dst())
@hypothesis.given(dt=hypothesis.strategies.datetimes(), key=valid_keys())
@hypothesis.example(dt=datetime.datetime.max, key="America/New_York")
@hypothesis.example(dt=datetime.datetime.min, key="America/New_York")
@hypothesis.example(dt=datetime.datetime.min, key="Asia/Tokyo")
@hypothesis.example(dt=datetime.datetime.max, key="Asia/Tokyo")
def test_same_to_utc(self, dt, key):
py_dt = dt.replace(tzinfo=py_zoneinfo.ZoneInfo(key))
c_dt = dt.replace(tzinfo=c_zoneinfo.ZoneInfo(key))
# Convert from UTC: Overflow OK if it happens in both implementations
py_overflow_exc = None
c_overflow_exc = None
try:
py_utc = py_dt.astimezone(UTC)
except OverflowError as e:
py_overflow_exc = e
try:
c_utc = c_dt.astimezone(UTC)
except OverflowError as e:
c_overflow_exc = e
if (py_overflow_exc is not None) != (c_overflow_exc is not None):
raise py_overflow_exc or c_overflow_exc # pragma: nocover
if py_overflow_exc is not None:
return # Consistently raises the same exception
self.assertEqual(py_utc, c_utc)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_cross_module_pickle(self, key):
py_zi = py_zoneinfo.ZoneInfo(key)
c_zi = c_zoneinfo.ZoneInfo(key)
with test_support.set_zoneinfo_module(py_zoneinfo):
py_pkl = pickle.dumps(py_zi)
with test_support.set_zoneinfo_module(c_zoneinfo):
c_pkl = pickle.dumps(c_zi)
with test_support.set_zoneinfo_module(c_zoneinfo):
# Python → C
py_to_c_zi = pickle.loads(py_pkl)
self.assertIs(py_to_c_zi, c_zi)
with test_support.set_zoneinfo_module(py_zoneinfo):
# C → Python
c_to_py_zi = pickle.loads(c_pkl)
self.assertIs(c_to_py_zi, py_zi)

31
Lib/zoneinfo/__init__.py vendored Normal file
View File

@@ -0,0 +1,31 @@
__all__ = [
"ZoneInfo",
"reset_tzpath",
"available_timezones",
"TZPATH",
"ZoneInfoNotFoundError",
"InvalidTZPathWarning",
]
from . import _tzpath
from ._common import ZoneInfoNotFoundError
try:
from _zoneinfo import ZoneInfo
except ImportError: # pragma: nocover
from ._zoneinfo import ZoneInfo
reset_tzpath = _tzpath.reset_tzpath
available_timezones = _tzpath.available_timezones
InvalidTZPathWarning = _tzpath.InvalidTZPathWarning
def __getattr__(name):
if name == "TZPATH":
return _tzpath.TZPATH
else:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def __dir__():
return sorted(list(globals()) + ["TZPATH"])

164
Lib/zoneinfo/_common.py vendored Normal file
View File

@@ -0,0 +1,164 @@
import struct
def load_tzdata(key):
from importlib import resources
components = key.split("/")
package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
resource_name = components[-1]
try:
return resources.files(package_name).joinpath(resource_name).open("rb")
except (ImportError, FileNotFoundError, UnicodeEncodeError):
# There are three types of exception that can be raised that all amount
# to "we cannot find this key":
#
# ImportError: If package_name doesn't exist (e.g. if tzdata is not
# installed, or if there's an error in the folder name like
# Amrica/New_York)
# FileNotFoundError: If resource_name doesn't exist in the package
# (e.g. Europe/Krasnoy)
# UnicodeEncodeError: If package_name or resource_name are not UTF-8,
# such as keys containing a surrogate character.
raise ZoneInfoNotFoundError(f"No time zone found with key {key}")
def load_data(fobj):
header = _TZifHeader.from_file(fobj)
if header.version == 1:
time_size = 4
time_type = "l"
else:
# Version 2+ has 64-bit integer transition times
time_size = 8
time_type = "q"
# Version 2+ also starts with a Version 1 header and data, which
# we need to skip now
skip_bytes = (
header.timecnt * 5 # Transition times and types
+ header.typecnt * 6 # Local time type records
+ header.charcnt # Time zone designations
+ header.leapcnt * 8 # Leap second records
+ header.isstdcnt # Standard/wall indicators
+ header.isutcnt # UT/local indicators
)
fobj.seek(skip_bytes, 1)
# Now we need to read the second header, which is not the same
# as the first
header = _TZifHeader.from_file(fobj)
typecnt = header.typecnt
timecnt = header.timecnt
charcnt = header.charcnt
# The data portion starts with timecnt transitions and indices
if timecnt:
trans_list_utc = struct.unpack(
f">{timecnt}{time_type}", fobj.read(timecnt * time_size)
)
trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt))
else:
trans_list_utc = ()
trans_idx = ()
# Read the ttinfo struct, (utoff, isdst, abbrind)
if typecnt:
utcoff, isdst, abbrind = zip(
*(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt))
)
else:
utcoff = ()
isdst = ()
abbrind = ()
# Now read the abbreviations. They are null-terminated strings, indexed
# not by position in the array but by position in the unsplit
# abbreviation string. I suppose this makes more sense in C, which uses
# null to terminate the strings, but it's inconvenient here...
abbr_vals = {}
abbr_chars = fobj.read(charcnt)
def get_abbr(idx):
# Gets a string starting at idx and running until the next \x00
#
# We cannot pre-populate abbr_vals by splitting on \x00 because there
# are some zones that use subsets of longer abbreviations, like so:
#
# LMT\x00AHST\x00HDT\x00
#
# Where the idx to abbr mapping should be:
#
# {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"}
if idx not in abbr_vals:
span_end = abbr_chars.find(b"\x00", idx)
abbr_vals[idx] = abbr_chars[idx:span_end].decode()
return abbr_vals[idx]
abbr = tuple(get_abbr(idx) for idx in abbrind)
# The remainder of the file consists of leap seconds (currently unused) and
# the standard/wall and ut/local indicators, which are metadata we don't need.
# In version 2 files, we need to skip the unnecessary data to get at the TZ string:
if header.version >= 2:
# Each leap second record has size (time_size + 4)
skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12
fobj.seek(skip_bytes, 1)
c = fobj.read(1) # Should be \n
assert c == b"\n", c
tz_bytes = b""
while (c := fobj.read(1)) != b"\n":
tz_bytes += c
tz_str = tz_bytes
else:
tz_str = None
return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str
class _TZifHeader:
__slots__ = [
"version",
"isutcnt",
"isstdcnt",
"leapcnt",
"timecnt",
"typecnt",
"charcnt",
]
def __init__(self, *args):
for attr, val in zip(self.__slots__, args, strict=True):
setattr(self, attr, val)
@classmethod
def from_file(cls, stream):
# The header starts with a 4-byte "magic" value
if stream.read(4) != b"TZif":
raise ValueError("Invalid TZif file: magic not found")
_version = stream.read(1)
if _version == b"\x00":
version = 1
else:
version = int(_version)
stream.read(15)
args = (version,)
# Slots are defined in the order that the bytes are arranged
args = args + struct.unpack(">6l", stream.read(24))
return cls(*args)
class ZoneInfoNotFoundError(KeyError):
"""Exception raised when a ZoneInfo key is not found."""

181
Lib/zoneinfo/_tzpath.py vendored Normal file
View File

@@ -0,0 +1,181 @@
import os
import sysconfig
def _reset_tzpath(to=None, stacklevel=4):
global TZPATH
tzpaths = to
if tzpaths is not None:
if isinstance(tzpaths, (str, bytes)):
raise TypeError(
f"tzpaths must be a list or tuple, "
+ f"not {type(tzpaths)}: {tzpaths!r}"
)
if not all(map(os.path.isabs, tzpaths)):
raise ValueError(_get_invalid_paths_message(tzpaths))
base_tzpath = tzpaths
else:
env_var = os.environ.get("PYTHONTZPATH", None)
if env_var is None:
env_var = sysconfig.get_config_var("TZPATH")
base_tzpath = _parse_python_tzpath(env_var, stacklevel)
TZPATH = tuple(base_tzpath)
def reset_tzpath(to=None):
"""Reset global TZPATH."""
# We need `_reset_tzpath` helper function because it produces a warning,
# it is used as both a module-level call and a public API.
# This is how we equalize the stacklevel for both calls.
_reset_tzpath(to)
def _parse_python_tzpath(env_var, stacklevel):
if not env_var:
return ()
raw_tzpath = env_var.split(os.pathsep)
new_tzpath = tuple(filter(os.path.isabs, raw_tzpath))
# If anything has been filtered out, we will warn about it
if len(new_tzpath) != len(raw_tzpath):
import warnings
msg = _get_invalid_paths_message(raw_tzpath)
warnings.warn(
"Invalid paths specified in PYTHONTZPATH environment variable. "
+ msg,
InvalidTZPathWarning,
stacklevel=stacklevel,
)
return new_tzpath
def _get_invalid_paths_message(tzpaths):
invalid_paths = (path for path in tzpaths if not os.path.isabs(path))
prefix = "\n "
indented_str = prefix + prefix.join(invalid_paths)
return (
"Paths should be absolute but found the following relative paths:"
+ indented_str
)
def find_tzfile(key):
"""Retrieve the path to a TZif file from a key."""
_validate_tzfile_path(key)
for search_path in TZPATH:
filepath = os.path.join(search_path, key)
if os.path.isfile(filepath):
return filepath
return None
_TEST_PATH = os.path.normpath(os.path.join("_", "_"))[:-1]
def _validate_tzfile_path(path, _base=_TEST_PATH):
if os.path.isabs(path):
raise ValueError(
f"ZoneInfo keys may not be absolute paths, got: {path}"
)
# We only care about the kinds of path normalizations that would change the
# length of the key - e.g. a/../b -> a/b, or a/b/ -> a/b. On Windows,
# normpath will also change from a/b to a\b, but that would still preserve
# the length.
new_path = os.path.normpath(path)
if len(new_path) != len(path):
raise ValueError(
f"ZoneInfo keys must be normalized relative paths, got: {path}"
)
resolved = os.path.normpath(os.path.join(_base, new_path))
if not resolved.startswith(_base):
raise ValueError(
f"ZoneInfo keys must refer to subdirectories of TZPATH, got: {path}"
)
del _TEST_PATH
def available_timezones():
"""Returns a set containing all available time zones.
.. caution::
This may attempt to open a large number of files, since the best way to
determine if a given file on the time zone search path is to open it
and check for the "magic string" at the beginning.
"""
from importlib import resources
valid_zones = set()
# Start with loading from the tzdata package if it exists: this has a
# pre-assembled list of zones that only requires opening one file.
try:
with resources.files("tzdata").joinpath("zones").open("r") as f:
for zone in f:
zone = zone.strip()
if zone:
valid_zones.add(zone)
except (ImportError, FileNotFoundError):
pass
def valid_key(fpath):
try:
with open(fpath, "rb") as f:
return f.read(4) == b"TZif"
except Exception: # pragma: nocover
return False
for tz_root in TZPATH:
if not os.path.exists(tz_root):
continue
for root, dirnames, files in os.walk(tz_root):
if root == tz_root:
# right/ and posix/ are special directories and shouldn't be
# included in the output of available zones
if "right" in dirnames:
dirnames.remove("right")
if "posix" in dirnames:
dirnames.remove("posix")
for file in files:
fpath = os.path.join(root, file)
key = os.path.relpath(fpath, start=tz_root)
if os.sep != "/": # pragma: nocover
key = key.replace(os.sep, "/")
if not key or key in valid_zones:
continue
if valid_key(fpath):
valid_zones.add(key)
if "posixrules" in valid_zones:
# posixrules is a special symlink-only time zone where it exists, it
# should not be included in the output
valid_zones.remove("posixrules")
return valid_zones
class InvalidTZPathWarning(RuntimeWarning):
"""Warning raised if an invalid path is specified in PYTHONTZPATH."""
TZPATH = ()
_reset_tzpath(stacklevel=5)

772
Lib/zoneinfo/_zoneinfo.py vendored Normal file
View File

@@ -0,0 +1,772 @@
import bisect
import calendar
import collections
import functools
import re
import weakref
from datetime import datetime, timedelta, tzinfo
from . import _common, _tzpath
EPOCH = datetime(1970, 1, 1)
EPOCHORDINAL = datetime(1970, 1, 1).toordinal()
# It is relatively expensive to construct new timedelta objects, and in most
# cases we're looking at the same deltas, like integer numbers of hours, etc.
# To improve speed and memory use, we'll keep a dictionary with references
# to the ones we've already used so far.
#
# Loading every time zone in the 2020a version of the time zone database
# requires 447 timedeltas, which requires approximately the amount of space
# that ZoneInfo("America/New_York") with 236 transitions takes up, so we will
# set the cache size to 512 so that in the common case we always get cache
# hits, but specifically crafted ZoneInfo objects don't leak arbitrary amounts
# of memory.
@functools.lru_cache(maxsize=512)
def _load_timedelta(seconds):
return timedelta(seconds=seconds)
class ZoneInfo(tzinfo):
_strong_cache_size = 8
_strong_cache = collections.OrderedDict()
_weak_cache = weakref.WeakValueDictionary()
__module__ = "zoneinfo"
def __init_subclass__(cls):
cls._strong_cache = collections.OrderedDict()
cls._weak_cache = weakref.WeakValueDictionary()
def __new__(cls, key):
instance = cls._weak_cache.get(key, None)
if instance is None:
instance = cls._weak_cache.setdefault(key, cls._new_instance(key))
instance._from_cache = True
# Update the "strong" cache
cls._strong_cache[key] = cls._strong_cache.pop(key, instance)
if len(cls._strong_cache) > cls._strong_cache_size:
cls._strong_cache.popitem(last=False)
return instance
@classmethod
def no_cache(cls, key):
obj = cls._new_instance(key)
obj._from_cache = False
return obj
@classmethod
def _new_instance(cls, key):
obj = super().__new__(cls)
obj._key = key
obj._file_path = obj._find_tzfile(key)
if obj._file_path is not None:
file_obj = open(obj._file_path, "rb")
else:
file_obj = _common.load_tzdata(key)
with file_obj as f:
obj._load_file(f)
return obj
@classmethod
def from_file(cls, fobj, /, key=None):
obj = super().__new__(cls)
obj._key = key
obj._file_path = None
obj._load_file(fobj)
obj._file_repr = repr(fobj)
# Disable pickling for objects created from files
obj.__reduce__ = obj._file_reduce
return obj
@classmethod
def clear_cache(cls, *, only_keys=None):
if only_keys is not None:
for key in only_keys:
cls._weak_cache.pop(key, None)
cls._strong_cache.pop(key, None)
else:
cls._weak_cache.clear()
cls._strong_cache.clear()
@property
def key(self):
return self._key
def utcoffset(self, dt):
return self._find_trans(dt).utcoff
def dst(self, dt):
return self._find_trans(dt).dstoff
def tzname(self, dt):
return self._find_trans(dt).tzname
def fromutc(self, dt):
"""Convert from datetime in UTC to datetime in local time"""
if not isinstance(dt, datetime):
raise TypeError("fromutc() requires a datetime argument")
if dt.tzinfo is not self:
raise ValueError("dt.tzinfo is not self")
timestamp = self._get_local_timestamp(dt)
num_trans = len(self._trans_utc)
if num_trans >= 1 and timestamp < self._trans_utc[0]:
tti = self._tti_before
fold = 0
elif (
num_trans == 0 or timestamp > self._trans_utc[-1]
) and not isinstance(self._tz_after, _ttinfo):
tti, fold = self._tz_after.get_trans_info_fromutc(
timestamp, dt.year
)
elif num_trans == 0:
tti = self._tz_after
fold = 0
else:
idx = bisect.bisect_right(self._trans_utc, timestamp)
if num_trans > 1 and timestamp >= self._trans_utc[1]:
tti_prev, tti = self._ttinfos[idx - 2 : idx]
elif timestamp > self._trans_utc[-1]:
tti_prev = self._ttinfos[-1]
tti = self._tz_after
else:
tti_prev = self._tti_before
tti = self._ttinfos[0]
# Detect fold
shift = tti_prev.utcoff - tti.utcoff
fold = shift.total_seconds() > timestamp - self._trans_utc[idx - 1]
dt += tti.utcoff
if fold:
return dt.replace(fold=1)
else:
return dt
def _find_trans(self, dt):
if dt is None:
if self._fixed_offset:
return self._tz_after
else:
return _NO_TTINFO
ts = self._get_local_timestamp(dt)
lt = self._trans_local[dt.fold]
num_trans = len(lt)
if num_trans and ts < lt[0]:
return self._tti_before
elif not num_trans or ts > lt[-1]:
if isinstance(self._tz_after, _TZStr):
return self._tz_after.get_trans_info(ts, dt.year, dt.fold)
else:
return self._tz_after
else:
# idx is the transition that occurs after this timestamp, so we
# subtract off 1 to get the current ttinfo
idx = bisect.bisect_right(lt, ts) - 1
assert idx >= 0
return self._ttinfos[idx]
def _get_local_timestamp(self, dt):
return (
(dt.toordinal() - EPOCHORDINAL) * 86400
+ dt.hour * 3600
+ dt.minute * 60
+ dt.second
)
def __str__(self):
if self._key is not None:
return f"{self._key}"
else:
return repr(self)
def __repr__(self):
if self._key is not None:
return f"{self.__class__.__name__}(key={self._key!r})"
else:
return f"{self.__class__.__name__}.from_file({self._file_repr})"
def __reduce__(self):
return (self.__class__._unpickle, (self._key, self._from_cache))
def _file_reduce(self):
import pickle
raise pickle.PicklingError(
"Cannot pickle a ZoneInfo file created from a file stream."
)
@classmethod
def _unpickle(cls, key, from_cache, /):
if from_cache:
return cls(key)
else:
return cls.no_cache(key)
def _find_tzfile(self, key):
return _tzpath.find_tzfile(key)
def _load_file(self, fobj):
# Retrieve all the data as it exists in the zoneinfo file
trans_idx, trans_utc, utcoff, isdst, abbr, tz_str = _common.load_data(
fobj
)
# Infer the DST offsets (needed for .dst()) from the data
dstoff = self._utcoff_to_dstoff(trans_idx, utcoff, isdst)
# Convert all the transition times (UTC) into "seconds since 1970-01-01 local time"
trans_local = self._ts_to_local(trans_idx, trans_utc, utcoff)
# Construct `_ttinfo` objects for each transition in the file
_ttinfo_list = [
_ttinfo(
_load_timedelta(utcoffset), _load_timedelta(dstoffset), tzname
)
for utcoffset, dstoffset, tzname in zip(utcoff, dstoff, abbr)
]
self._trans_utc = trans_utc
self._trans_local = trans_local
self._ttinfos = [_ttinfo_list[idx] for idx in trans_idx]
# Find the first non-DST transition
for i in range(len(isdst)):
if not isdst[i]:
self._tti_before = _ttinfo_list[i]
break
else:
if self._ttinfos:
self._tti_before = self._ttinfos[0]
else:
self._tti_before = None
# Set the "fallback" time zone
if tz_str is not None and tz_str != b"":
self._tz_after = _parse_tz_str(tz_str.decode())
else:
if not self._ttinfos and not _ttinfo_list:
raise ValueError("No time zone information found.")
if self._ttinfos:
self._tz_after = self._ttinfos[-1]
else:
self._tz_after = _ttinfo_list[-1]
# Determine if this is a "fixed offset" zone, meaning that the output
# of the utcoffset, dst and tzname functions does not depend on the
# specific datetime passed.
#
# We make three simplifying assumptions here:
#
# 1. If _tz_after is not a _ttinfo, it has transitions that might
# actually occur (it is possible to construct TZ strings that
# specify STD and DST but no transitions ever occur, such as
# AAA0BBB,0/0,J365/25).
# 2. If _ttinfo_list contains more than one _ttinfo object, the objects
# represent different offsets.
# 3. _ttinfo_list contains no unused _ttinfos (in which case an
# otherwise fixed-offset zone with extra _ttinfos defined may
# appear to *not* be a fixed offset zone).
#
# Violations to these assumptions would be fairly exotic, and exotic
# zones should almost certainly not be used with datetime.time (the
# only thing that would be affected by this).
if len(_ttinfo_list) > 1 or not isinstance(self._tz_after, _ttinfo):
self._fixed_offset = False
elif not _ttinfo_list:
self._fixed_offset = True
else:
self._fixed_offset = _ttinfo_list[0] == self._tz_after
@staticmethod
def _utcoff_to_dstoff(trans_idx, utcoffsets, isdsts):
# Now we must transform our ttis and abbrs into `_ttinfo` objects,
# but there is an issue: .dst() must return a timedelta with the
# difference between utcoffset() and the "standard" offset, but
# the "base offset" and "DST offset" are not encoded in the file;
# we can infer what they are from the isdst flag, but it is not
# sufficient to just look at the last standard offset, because
# occasionally countries will shift both DST offset and base offset.
typecnt = len(isdsts)
dstoffs = [0] * typecnt # Provisionally assign all to 0.
dst_cnt = sum(isdsts)
dst_found = 0
for i in range(1, len(trans_idx)):
if dst_cnt == dst_found:
break
idx = trans_idx[i]
dst = isdsts[idx]
# We're only going to look at daylight saving time
if not dst:
continue
# Skip any offsets that have already been assigned
if dstoffs[idx] != 0:
continue
dstoff = 0
utcoff = utcoffsets[idx]
comp_idx = trans_idx[i - 1]
if not isdsts[comp_idx]:
dstoff = utcoff - utcoffsets[comp_idx]
if not dstoff and idx < (typecnt - 1):
comp_idx = trans_idx[i + 1]
# If the following transition is also DST and we couldn't
# find the DST offset by this point, we're going to have to
# skip it and hope this transition gets assigned later
if isdsts[comp_idx]:
continue
dstoff = utcoff - utcoffsets[comp_idx]
if dstoff:
dst_found += 1
dstoffs[idx] = dstoff
else:
# If we didn't find a valid value for a given index, we'll end up
# with dstoff = 0 for something where `isdst=1`. This is obviously
# wrong - one hour will be a much better guess than 0
for idx in range(typecnt):
if not dstoffs[idx] and isdsts[idx]:
dstoffs[idx] = 3600
return dstoffs
@staticmethod
def _ts_to_local(trans_idx, trans_list_utc, utcoffsets):
"""Generate number of seconds since 1970 *in the local time*.
This is necessary to easily find the transition times in local time"""
if not trans_list_utc:
return [[], []]
# Start with the timestamps and modify in-place
trans_list_wall = [list(trans_list_utc), list(trans_list_utc)]
if len(utcoffsets) > 1:
offset_0 = utcoffsets[0]
offset_1 = utcoffsets[trans_idx[0]]
if offset_1 > offset_0:
offset_1, offset_0 = offset_0, offset_1
else:
offset_0 = offset_1 = utcoffsets[0]
trans_list_wall[0][0] += offset_0
trans_list_wall[1][0] += offset_1
for i in range(1, len(trans_idx)):
offset_0 = utcoffsets[trans_idx[i - 1]]
offset_1 = utcoffsets[trans_idx[i]]
if offset_1 > offset_0:
offset_1, offset_0 = offset_0, offset_1
trans_list_wall[0][i] += offset_0
trans_list_wall[1][i] += offset_1
return trans_list_wall
class _ttinfo:
__slots__ = ["utcoff", "dstoff", "tzname"]
def __init__(self, utcoff, dstoff, tzname):
self.utcoff = utcoff
self.dstoff = dstoff
self.tzname = tzname
def __eq__(self, other):
return (
self.utcoff == other.utcoff
and self.dstoff == other.dstoff
and self.tzname == other.tzname
)
def __repr__(self): # pragma: nocover
return (
f"{self.__class__.__name__}"
+ f"({self.utcoff}, {self.dstoff}, {self.tzname})"
)
_NO_TTINFO = _ttinfo(None, None, None)
class _TZStr:
__slots__ = (
"std",
"dst",
"start",
"end",
"get_trans_info",
"get_trans_info_fromutc",
"dst_diff",
)
def __init__(
self, std_abbr, std_offset, dst_abbr, dst_offset, start=None, end=None
):
self.dst_diff = dst_offset - std_offset
std_offset = _load_timedelta(std_offset)
self.std = _ttinfo(
utcoff=std_offset, dstoff=_load_timedelta(0), tzname=std_abbr
)
self.start = start
self.end = end
dst_offset = _load_timedelta(dst_offset)
delta = _load_timedelta(self.dst_diff)
self.dst = _ttinfo(utcoff=dst_offset, dstoff=delta, tzname=dst_abbr)
# These are assertions because the constructor should only be called
# by functions that would fail before passing start or end
assert start is not None, "No transition start specified"
assert end is not None, "No transition end specified"
self.get_trans_info = self._get_trans_info
self.get_trans_info_fromutc = self._get_trans_info_fromutc
def transitions(self, year):
start = self.start.year_to_epoch(year)
end = self.end.year_to_epoch(year)
return start, end
def _get_trans_info(self, ts, year, fold):
"""Get the information about the current transition - tti"""
start, end = self.transitions(year)
# With fold = 0, the period (denominated in local time) with the
# smaller offset starts at the end of the gap and ends at the end of
# the fold; with fold = 1, it runs from the start of the gap to the
# beginning of the fold.
#
# So in order to determine the DST boundaries we need to know both
# the fold and whether DST is positive or negative (rare), and it
# turns out that this boils down to fold XOR is_positive.
if fold == (self.dst_diff >= 0):
end -= self.dst_diff
else:
start += self.dst_diff
if start < end:
isdst = start <= ts < end
else:
isdst = not (end <= ts < start)
return self.dst if isdst else self.std
def _get_trans_info_fromutc(self, ts, year):
start, end = self.transitions(year)
start -= self.std.utcoff.total_seconds()
end -= self.dst.utcoff.total_seconds()
if start < end:
isdst = start <= ts < end
else:
isdst = not (end <= ts < start)
# For positive DST, the ambiguous period is one dst_diff after the end
# of DST; for negative DST, the ambiguous period is one dst_diff before
# the start of DST.
if self.dst_diff > 0:
ambig_start = end
ambig_end = end + self.dst_diff
else:
ambig_start = start
ambig_end = start - self.dst_diff
fold = ambig_start <= ts < ambig_end
return (self.dst if isdst else self.std, fold)
def _post_epoch_days_before_year(year):
"""Get the number of days between 1970-01-01 and YEAR-01-01"""
y = year - 1
return y * 365 + y // 4 - y // 100 + y // 400 - EPOCHORDINAL
class _DayOffset:
__slots__ = ["d", "julian", "hour", "minute", "second"]
def __init__(self, d, julian, hour=2, minute=0, second=0):
min_day = 0 + julian # convert bool to int
if not min_day <= d <= 365:
raise ValueError(f"d must be in [{min_day}, 365], not: {d}")
self.d = d
self.julian = julian
self.hour = hour
self.minute = minute
self.second = second
def year_to_epoch(self, year):
days_before_year = _post_epoch_days_before_year(year)
d = self.d
if self.julian and d >= 59 and calendar.isleap(year):
d += 1
epoch = (days_before_year + d) * 86400
epoch += self.hour * 3600 + self.minute * 60 + self.second
return epoch
class _CalendarOffset:
__slots__ = ["m", "w", "d", "hour", "minute", "second"]
_DAYS_BEFORE_MONTH = (
-1,
0,
31,
59,
90,
120,
151,
181,
212,
243,
273,
304,
334,
)
def __init__(self, m, w, d, hour=2, minute=0, second=0):
if not 1 <= m <= 12:
raise ValueError("m must be in [1, 12]")
if not 1 <= w <= 5:
raise ValueError("w must be in [1, 5]")
if not 0 <= d <= 6:
raise ValueError("d must be in [0, 6]")
self.m = m
self.w = w
self.d = d
self.hour = hour
self.minute = minute
self.second = second
@classmethod
def _ymd2ord(cls, year, month, day):
return (
_post_epoch_days_before_year(year)
+ cls._DAYS_BEFORE_MONTH[month]
+ (month > 2 and calendar.isleap(year))
+ day
)
# TODO: These are not actually epoch dates as they are expressed in local time
def year_to_epoch(self, year):
"""Calculates the datetime of the occurrence from the year"""
# We know year and month, we need to convert w, d into day of month
#
# Week 1 is the first week in which day `d` (where 0 = Sunday) appears.
# Week 5 represents the last occurrence of day `d`, so we need to know
# the range of the month.
first_day, days_in_month = calendar.monthrange(year, self.m)
# This equation seems magical, so I'll break it down:
# 1. calendar says 0 = Monday, POSIX says 0 = Sunday
# so we need first_day + 1 to get 1 = Monday -> 7 = Sunday,
# which is still equivalent because this math is mod 7
# 2. Get first day - desired day mod 7: -1 % 7 = 6, so we don't need
# to do anything to adjust negative numbers.
# 3. Add 1 because month days are a 1-based index.
month_day = (self.d - (first_day + 1)) % 7 + 1
# Now use a 0-based index version of `w` to calculate the w-th
# occurrence of `d`
month_day += (self.w - 1) * 7
# month_day will only be > days_in_month if w was 5, and `w` means
# "last occurrence of `d`", so now we just check if we over-shot the
# end of the month and if so knock off 1 week.
if month_day > days_in_month:
month_day -= 7
ordinal = self._ymd2ord(year, self.m, month_day)
epoch = ordinal * 86400
epoch += self.hour * 3600 + self.minute * 60 + self.second
return epoch
def _parse_tz_str(tz_str):
# The tz string has the format:
#
# std[offset[dst[offset],start[/time],end[/time]]]
#
# std and dst must be 3 or more characters long and must not contain
# a leading colon, embedded digits, commas, nor a plus or minus signs;
# The spaces between "std" and "offset" are only for display and are
# not actually present in the string.
#
# The format of the offset is ``[+|-]hh[:mm[:ss]]``
offset_str, *start_end_str = tz_str.split(",", 1)
parser_re = re.compile(
r"""
(?P<std>[^<0-9:.+-]+|<[a-zA-Z0-9+-]+>)
(?:
(?P<stdoff>[+-]?\d{1,3}(?::\d{2}(?::\d{2})?)?)
(?:
(?P<dst>[^0-9:.+-]+|<[a-zA-Z0-9+-]+>)
(?P<dstoff>[+-]?\d{1,3}(?::\d{2}(?::\d{2})?)?)?
)? # dst
)? # stdoff
""",
re.ASCII|re.VERBOSE
)
m = parser_re.fullmatch(offset_str)
if m is None:
raise ValueError(f"{tz_str} is not a valid TZ string")
std_abbr = m.group("std")
dst_abbr = m.group("dst")
dst_offset = None
std_abbr = std_abbr.strip("<>")
if dst_abbr:
dst_abbr = dst_abbr.strip("<>")
if std_offset := m.group("stdoff"):
try:
std_offset = _parse_tz_delta(std_offset)
except ValueError as e:
raise ValueError(f"Invalid STD offset in {tz_str}") from e
else:
std_offset = 0
if dst_abbr is not None:
if dst_offset := m.group("dstoff"):
try:
dst_offset = _parse_tz_delta(dst_offset)
except ValueError as e:
raise ValueError(f"Invalid DST offset in {tz_str}") from e
else:
dst_offset = std_offset + 3600
if not start_end_str:
raise ValueError(f"Missing transition rules: {tz_str}")
start_end_strs = start_end_str[0].split(",", 1)
try:
start, end = (_parse_dst_start_end(x) for x in start_end_strs)
except ValueError as e:
raise ValueError(f"Invalid TZ string: {tz_str}") from e
return _TZStr(std_abbr, std_offset, dst_abbr, dst_offset, start, end)
elif start_end_str:
raise ValueError(f"Transition rule present without DST: {tz_str}")
else:
# This is a static ttinfo, don't return _TZStr
return _ttinfo(
_load_timedelta(std_offset), _load_timedelta(0), std_abbr
)
def _parse_dst_start_end(dststr):
date, *time = dststr.split("/", 1)
type = date[:1]
if type == "M":
n_is_julian = False
m = re.fullmatch(r"M(\d{1,2})\.(\d).(\d)", date, re.ASCII)
if m is None:
raise ValueError(f"Invalid dst start/end date: {dststr}")
date_offset = tuple(map(int, m.groups()))
offset = _CalendarOffset(*date_offset)
else:
if type == "J":
n_is_julian = True
date = date[1:]
else:
n_is_julian = False
doy = int(date)
offset = _DayOffset(doy, n_is_julian)
if time:
offset.hour, offset.minute, offset.second = _parse_transition_time(time[0])
return offset
def _parse_transition_time(time_str):
match = re.fullmatch(
r"(?P<sign>[+-])?(?P<h>\d{1,3})(:(?P<m>\d{2})(:(?P<s>\d{2}))?)?",
time_str,
re.ASCII
)
if match is None:
raise ValueError(f"Invalid time: {time_str}")
h, m, s = (int(v or 0) for v in match.group("h", "m", "s"))
if h > 167:
raise ValueError(
f"Hour must be in [0, 167]: {time_str}"
)
if match.group("sign") == "-":
h, m, s = -h, -m, -s
return h, m, s
def _parse_tz_delta(tz_delta):
match = re.fullmatch(
r"(?P<sign>[+-])?(?P<h>\d{1,3})(:(?P<m>\d{2})(:(?P<s>\d{2}))?)?",
tz_delta,
re.ASCII
)
# Anything passed to this function should already have hit an equivalent
# regular expression to find the section to parse.
assert match is not None, tz_delta
h, m, s = (int(v or 0) for v in match.group("h", "m", "s"))
total = h * 3600 + m * 60 + s
if h > 24:
raise ValueError(
f"Offset hours must be in [0, 24]: {tz_delta}"
)
# Yes, +5 maps to an offset of -5h
if match.group("sign") != "-":
total = -total
return total