Update statistics.py to CPython 3.10

This commit is contained in:
Padraic Fanning
2022-02-12 23:25:34 -05:00
parent db788936c6
commit 78d543e776

415
Lib/statistics.py vendored
View File

@@ -73,6 +73,30 @@ second argument to the four "spread" functions to avoid recalculating it:
2.5
Statistics for relations between two inputs
-------------------------------------------
================== ====================================================
Function Description
================== ====================================================
covariance Sample covariance for two variables.
correlation Pearson's correlation coefficient for two variables.
linear_regression Intercept and slope for simple linear regression.
================== ====================================================
Calculate covariance, Pearson's correlation, and simple linear regression
for two inputs:
>>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> y = [1, 2, 3, 1, 2, 3, 1, 2, 3]
>>> covariance(x, y)
0.75
>>> correlation(x, y) #doctest: +ELLIPSIS
0.31622776601...
>>> linear_regression(x, y) #doctest:
LinearRegression(slope=0.1, intercept=1.5)
Exceptions
----------
@@ -83,9 +107,12 @@ A single exception is defined: StatisticsError is a subclass of ValueError.
__all__ = [
'NormalDist',
'StatisticsError',
'correlation',
'covariance',
'fmean',
'geometric_mean',
'harmonic_mean',
'linear_regression',
'mean',
'median',
'median_grouped',
@@ -106,11 +133,11 @@ import random
from fractions import Fraction
from decimal import Decimal
from itertools import groupby
from itertools import groupby, repeat
from bisect import bisect_left, bisect_right
from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum
from operator import itemgetter
from collections import Counter
from collections import Counter, namedtuple
# === Exceptions ===
@@ -120,21 +147,17 @@ class StatisticsError(ValueError):
# === Private utilities ===
def _sum(data, start=0):
"""_sum(data [, start]) -> (type, sum, count)
def _sum(data):
"""_sum(data) -> (type, sum, count)
Return a high-precision sum of the given numeric data as a fraction,
together with the type to be converted to and the count of items.
If optional argument ``start`` is given, it is added to the total.
If ``data`` is empty, ``start`` (defaulting to 0) is returned.
Examples
--------
>>> _sum([3, 2.25, 4.5, -0.5, 1.0], 0.75)
(<class 'float'>, Fraction(11, 1), 5)
>>> _sum([3, 2.25, 4.5, -0.5, 0.25])
(<class 'float'>, Fraction(19, 2), 5)
Some sources of round-off error will be avoided:
@@ -157,13 +180,12 @@ def _sum(data, start=0):
allowed.
"""
count = 0
n, d = _exact_ratio(start)
partials = {d: n}
partials = {}
partials_get = partials.get
T = _coerce(int, type(start))
T = int
for typ, values in groupby(data, type):
T = _coerce(T, typ) # or raise TypeError
for n,d in map(_exact_ratio, values):
for n, d in map(_exact_ratio, values):
count += 1
partials[d] = partials_get(d, 0) + n
if None in partials:
@@ -173,8 +195,7 @@ def _sum(data, start=0):
assert not _isfinite(total)
else:
# Sum all the partial sums using builtin sum.
# FIXME is this faster if we sum them in order of the denominator?
total = sum(Fraction(n, d) for d, n in sorted(partials.items()))
total = sum(Fraction(n, d) for d, n in partials.items())
return (T, total, count)
@@ -225,27 +246,19 @@ def _exact_ratio(x):
x is expected to be an int, Fraction, Decimal or float.
"""
try:
# Optimise the common case of floats. We expect that the most often
# used numeric type will be builtin floats, so try to make this as
# fast as possible.
if type(x) is float or type(x) is Decimal:
return x.as_integer_ratio()
try:
# x may be an int, Fraction, or Integral ABC.
return (x.numerator, x.denominator)
except AttributeError:
try:
# x may be a float or Decimal subclass.
return x.as_integer_ratio()
except AttributeError:
# Just give up?
pass
return x.as_integer_ratio()
except AttributeError:
pass
except (OverflowError, ValueError):
# float NAN or INF.
assert not _isfinite(x)
return (x, None)
msg = "can't convert type '{}' to numerator/denominator"
raise TypeError(msg.format(type(x).__name__))
try:
# x may be an Integral ABC.
return (x.numerator, x.denominator)
except AttributeError:
msg = f"can't convert type '{type(x).__name__}' to numerator/denominator"
raise TypeError(msg)
def _convert(value, T):
@@ -261,7 +274,7 @@ def _convert(value, T):
return T(value)
except TypeError:
if issubclass(T, Decimal):
return T(value.numerator)/T(value.denominator)
return T(value.numerator) / T(value.denominator)
else:
raise
@@ -277,8 +290,8 @@ def _find_lteq(a, x):
def _find_rteq(a, l, x):
'Locate the rightmost value exactly equal to x'
i = bisect_right(a, x, lo=l)
if i != (len(a)+1) and a[i-1] == x:
return i-1
if i != (len(a) + 1) and a[i - 1] == x:
return i - 1
raise ValueError
@@ -315,7 +328,7 @@ def mean(data):
raise StatisticsError('mean requires at least one data point')
T, total, count = _sum(data)
assert count == n
return _convert(total/n, T)
return _convert(total / n, T)
def fmean(data):
@@ -361,40 +374,39 @@ def geometric_mean(data):
return exp(fmean(map(log, data)))
except ValueError:
raise StatisticsError('geometric mean requires a non-empty dataset '
' containing positive numbers') from None
'containing positive numbers') from None
def harmonic_mean(data):
def harmonic_mean(data, weights=None):
"""Return the harmonic mean of data.
The harmonic mean, sometimes called the subcontrary mean, is the
reciprocal of the arithmetic mean of the reciprocals of the data,
and is often appropriate when averaging quantities which are rates
or ratios, for example speeds. Example:
The harmonic mean is the reciprocal of the arithmetic mean of the
reciprocals of the data. It can be used for averaging ratios or
rates, for example speeds.
Suppose an investor purchases an equal value of shares in each of
three companies, with P/E (price/earning) ratios of 2.5, 3 and 10.
What is the average P/E ratio for the investor's portfolio?
Suppose a car travels 40 km/hr for 5 km and then speeds-up to
60 km/hr for another 5 km. What is the average speed?
>>> harmonic_mean([2.5, 3, 10]) # For an equal investment portfolio.
3.6
>>> harmonic_mean([40, 60])
48.0
Using the arithmetic mean would give an average of about 5.167, which
is too high.
Suppose a car travels 40 km/hr for 5 km, and when traffic clears,
speeds-up to 60 km/hr for the remaining 30 km of the journey. What
is the average speed?
>>> harmonic_mean([40, 60], weights=[5, 30])
56.0
If ``data`` is empty, or any element is less than zero,
``harmonic_mean`` will raise ``StatisticsError``.
"""
# For a justification for using harmonic mean for P/E ratios, see
# http://fixthepitch.pellucid.com/comps-analysis-the-missing-harmony-of-summary-statistics/
# http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2621087
if iter(data) is data:
data = list(data)
errmsg = 'harmonic mean does not support negative values'
n = len(data)
if n < 1:
raise StatisticsError('harmonic_mean requires at least one data point')
elif n == 1:
elif n == 1 and weights is None:
x = data[0]
if isinstance(x, (numbers.Real, Decimal)):
if x < 0:
@@ -402,13 +414,23 @@ def harmonic_mean(data):
return x
else:
raise TypeError('unsupported type')
if weights is None:
weights = repeat(1, n)
sum_weights = n
else:
if iter(weights) is weights:
weights = list(weights)
if len(weights) != n:
raise StatisticsError('Number of weights does not match data size')
_, sum_weights, _ = _sum(w for w in _fail_neg(weights, errmsg))
try:
T, total, count = _sum(1/x for x in _fail_neg(data, errmsg))
data = _fail_neg(data, errmsg)
T, total, count = _sum(w / x if w else 0 for w, x in zip(weights, data))
except ZeroDivisionError:
return 0
assert count == n
return _convert(n/total, T)
if total <= 0:
raise StatisticsError('Weighted sum must be positive')
return _convert(sum_weights / total, T)
# FIXME: investigate ways to calculate medians without sorting? Quickselect?
def median(data):
@@ -428,11 +450,11 @@ def median(data):
n = len(data)
if n == 0:
raise StatisticsError("no median for empty data")
if n%2 == 1:
return data[n//2]
if n % 2 == 1:
return data[n // 2]
else:
i = n//2
return (data[i - 1] + data[i])/2
i = n // 2
return (data[i - 1] + data[i]) / 2
def median_low(data):
@@ -451,10 +473,10 @@ def median_low(data):
n = len(data)
if n == 0:
raise StatisticsError("no median for empty data")
if n%2 == 1:
return data[n//2]
if n % 2 == 1:
return data[n // 2]
else:
return data[n//2 - 1]
return data[n // 2 - 1]
def median_high(data):
@@ -473,7 +495,7 @@ def median_high(data):
n = len(data)
if n == 0:
raise StatisticsError("no median for empty data")
return data[n//2]
return data[n // 2]
def median_grouped(data, interval=1):
@@ -510,15 +532,15 @@ def median_grouped(data, interval=1):
return data[0]
# Find the value at the midpoint. Remember this corresponds to the
# centre of the class interval.
x = data[n//2]
x = data[n // 2]
for obj in (x, interval):
if isinstance(obj, (str, bytes)):
raise TypeError('expected number but got %r' % obj)
try:
L = x - interval/2 # The lower limit of the median interval.
L = x - interval / 2 # The lower limit of the median interval.
except TypeError:
# Mixed type. For now we just coerce to float.
L = float(x) - float(interval)/2
L = float(x) - float(interval) / 2
# Uses bisection search to search for x in data with log(n) time complexity
# Find the position of leftmost occurrence of x in data
@@ -528,7 +550,7 @@ def median_grouped(data, interval=1):
l2 = _find_rteq(data, l1, x)
cf = l1
f = l2 - l1 + 1
return L + interval*(n/2 - cf)/f
return L + interval * (n / 2 - cf) / f
def mode(data):
@@ -554,8 +576,7 @@ def mode(data):
If *data* is empty, ``mode``, raises StatisticsError.
"""
data = iter(data)
pairs = Counter(data).most_common(1)
pairs = Counter(iter(data)).most_common(1)
try:
return pairs[0][0]
except IndexError:
@@ -597,7 +618,7 @@ def multimode(data):
# For sample data where there is a positive probability for values
# beyond the range of the data, the R6 exclusive method is a
# reasonable choice. Consider a random sample of nine values from a
# population with a uniform distribution from 0.0 to 100.0. The
# population with a uniform distribution from 0.0 to 1.0. The
# distribution of the third ranked sample point is described by
# betavariate(alpha=3, beta=7) which has mode=0.250, median=0.286, and
# mean=0.300. Only the latter (which corresponds with R6) gives the
@@ -643,9 +664,8 @@ def quantiles(data, *, n=4, method='exclusive'):
m = ld - 1
result = []
for i in range(1, n):
j = i * m // n
delta = i*m - j*n
interpolated = (data[j] * (n - delta) + data[j+1] * delta) / n
j, delta = divmod(i * m, n)
interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
result.append(interpolated)
return result
if method == 'exclusive':
@@ -655,7 +675,7 @@ def quantiles(data, *, n=4, method='exclusive'):
j = i * m // n # rescale i to m/n
j = 1 if j < 1 else ld-1 if j > ld-1 else j # clamp to 1 .. ld-1
delta = i*m - j*n # exact integer math
interpolated = (data[j-1] * (n - delta) + data[j] * delta) / n
interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
result.append(interpolated)
return result
raise ValueError(f'Unknown method: {method!r}')
@@ -685,14 +705,20 @@ def _ss(data, c=None):
if c is not None:
T, total, count = _sum((x-c)**2 for x in data)
return (T, total)
c = mean(data)
T, total, count = _sum((x-c)**2 for x in data)
# The following sum should mathematically equal zero, but due to rounding
# error may not.
U, total2, count2 = _sum((x-c) for x in data)
assert T == U and count == count2
total -= total2**2/len(data)
assert not total < 0, 'negative sum of square deviations: %f' % total
T, total, count = _sum(data)
mean_n, mean_d = (total / count).as_integer_ratio()
partials = Counter()
for n, d in map(_exact_ratio, data):
diff_n = n * mean_d - d * mean_n
diff_d = d * mean_d
partials[diff_d * diff_d] += diff_n * diff_n
if None in partials:
# The sum will be a NAN or INF. We can ignore all the finite
# partials, and just look at this special one.
total = partials[None]
assert not _isfinite(total)
else:
total = sum(Fraction(n, d) for d, n in partials.items())
return (T, total)
@@ -740,7 +766,7 @@ def variance(data, xbar=None):
if n < 2:
raise StatisticsError('variance requires at least two data points')
T, ss = _ss(data, xbar)
return _convert(ss/(n-1), T)
return _convert(ss / (n - 1), T)
def pvariance(data, mu=None):
@@ -784,7 +810,7 @@ def pvariance(data, mu=None):
if n < 1:
raise StatisticsError('pvariance requires at least one data point')
T, ss = _ss(data, mu)
return _convert(ss/n, T)
return _convert(ss / n, T)
def stdev(data, xbar=None):
@@ -796,6 +822,9 @@ def stdev(data, xbar=None):
1.0810874155219827
"""
# Fixme: Despite the exact sum of squared deviations, some inaccuracy
# remain because there are two rounding steps. The first occurs in
# the _convert() step for variance(), the second occurs in math.sqrt().
var = variance(data, xbar)
try:
return var.sqrt()
@@ -812,6 +841,9 @@ def pstdev(data, mu=None):
0.986893273527251
"""
# Fixme: Despite the exact sum of squared deviations, some inaccuracy
# remain because there are two rounding steps. The first occurs in
# the _convert() step for pvariance(), the second occurs in math.sqrt().
var = pvariance(data, mu)
try:
return var.sqrt()
@@ -819,6 +851,119 @@ def pstdev(data, mu=None):
return math.sqrt(var)
# === Statistics for relations between two inputs ===
# See https://en.wikipedia.org/wiki/Covariance
# https://en.wikipedia.org/wiki/Pearson_correlation_coefficient
# https://en.wikipedia.org/wiki/Simple_linear_regression
def covariance(x, y, /):
"""Covariance
Return the sample covariance of two inputs *x* and *y*. Covariance
is a measure of the joint variability of two inputs.
>>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> y = [1, 2, 3, 1, 2, 3, 1, 2, 3]
>>> covariance(x, y)
0.75
>>> z = [9, 8, 7, 6, 5, 4, 3, 2, 1]
>>> covariance(x, z)
-7.5
>>> covariance(z, x)
-7.5
"""
n = len(x)
if len(y) != n:
raise StatisticsError('covariance requires that both inputs have same number of data points')
if n < 2:
raise StatisticsError('covariance requires at least two data points')
xbar = fsum(x) / n
ybar = fsum(y) / n
sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y))
return sxy / (n - 1)
def correlation(x, y, /):
"""Pearson's correlation coefficient
Return the Pearson's correlation coefficient for two inputs. Pearson's
correlation coefficient *r* takes values between -1 and +1. It measures the
strength and direction of the linear relationship, where +1 means very
strong, positive linear relationship, -1 very strong, negative linear
relationship, and 0 no linear relationship.
>>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> y = [9, 8, 7, 6, 5, 4, 3, 2, 1]
>>> correlation(x, x)
1.0
>>> correlation(x, y)
-1.0
"""
n = len(x)
if len(y) != n:
raise StatisticsError('correlation requires that both inputs have same number of data points')
if n < 2:
raise StatisticsError('correlation requires at least two data points')
xbar = fsum(x) / n
ybar = fsum(y) / n
sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y))
sxx = fsum((xi - xbar) ** 2.0 for xi in x)
syy = fsum((yi - ybar) ** 2.0 for yi in y)
try:
return sxy / sqrt(sxx * syy)
except ZeroDivisionError:
raise StatisticsError('at least one of the inputs is constant')
LinearRegression = namedtuple('LinearRegression', ('slope', 'intercept'))
def linear_regression(x, y, /):
"""Slope and intercept for simple linear regression.
Return the slope and intercept of simple linear regression
parameters estimated using ordinary least squares. Simple linear
regression describes relationship between an independent variable
*x* and a dependent variable *y* in terms of linear function:
y = slope * x + intercept + noise
where *slope* and *intercept* are the regression parameters that are
estimated, and noise represents the variability of the data that was
not explained by the linear regression (it is equal to the
difference between predicted and actual values of the dependent
variable).
The parameters are returned as a named tuple.
>>> x = [1, 2, 3, 4, 5]
>>> noise = NormalDist().samples(5, seed=42)
>>> y = [3 * x[i] + 2 + noise[i] for i in range(5)]
>>> linear_regression(x, y) #doctest: +ELLIPSIS
LinearRegression(slope=3.09078914170..., intercept=1.75684970486...)
"""
n = len(x)
if len(y) != n:
raise StatisticsError('linear regression requires that both inputs have same number of data points')
if n < 2:
raise StatisticsError('linear regression requires at least two data points')
xbar = fsum(x) / n
ybar = fsum(y) / n
sxy = fsum((xi - xbar) * (yi - ybar) for xi, yi in zip(x, y))
sxx = fsum((xi - xbar) ** 2.0 for xi in x)
try:
slope = sxy / sxx # equivalent to: covariance(x, y) / variance(x)
except ZeroDivisionError:
raise StatisticsError('x is constant')
intercept = ybar - slope * xbar
return LinearRegression(slope=slope, intercept=intercept)
## Normal Distribution #####################################################
@@ -896,6 +1041,13 @@ def _normal_dist_inv_cdf(p, mu, sigma):
return mu + (x * sigma)
# If available, use C implementation
try:
from _statistics import _normal_dist_inv_cdf
except ImportError:
pass
class NormalDist:
"Normal distribution of a random variable"
# https://en.wikipedia.org/wiki/Normal_distribution
@@ -986,7 +1138,7 @@ class NormalDist:
if not isinstance(other, NormalDist):
raise TypeError('Expected another NormalDist instance')
X, Y = self, other
if (Y._sigma, Y._mu) < (X._sigma, X._mu): # sort to assure commutativity
if (Y._sigma, Y._mu) < (X._sigma, X._mu): # sort to assure commutativity
X, Y = Y, X
X_var, Y_var = X.variance, Y.variance
if not X_var or not Y_var:
@@ -1001,6 +1153,17 @@ class NormalDist:
x2 = (a - b) / dv
return 1.0 - (fabs(Y.cdf(x1) - X.cdf(x1)) + fabs(Y.cdf(x2) - X.cdf(x2)))
def zscore(self, x):
"""Compute the Standard Score. (x - mean) / stdev
Describes *x* in terms of the number of standard deviations
above or below the mean of the normal distribution.
"""
# https://www.statisticshowto.com/probability-and-statistics/z-score/
if not self._sigma:
raise StatisticsError('zscore() not defined when sigma is zero')
return (x - self._mu) / self._sigma
@property
def mean(self):
"Arithmetic mean of the normal distribution."
@@ -1102,79 +1265,3 @@ class NormalDist:
def __repr__(self):
return f'{type(self).__name__}(mu={self._mu!r}, sigma={self._sigma!r})'
# If available, use C implementation
try:
from _statistics import _normal_dist_inv_cdf
except ImportError:
pass
if __name__ == '__main__':
# Show math operations computed analytically in comparsion
# to a monte carlo simulation of the same operations
from math import isclose
from operator import add, sub, mul, truediv
from itertools import repeat
import doctest
g1 = NormalDist(10, 20)
g2 = NormalDist(-5, 25)
# Test scaling by a constant
assert (g1 * 5 / 5).mean == g1.mean
assert (g1 * 5 / 5).stdev == g1.stdev
n = 100_000
G1 = g1.samples(n)
G2 = g2.samples(n)
for func in (add, sub):
print(f'\nTest {func.__name__} with another NormalDist:')
print(func(g1, g2))
print(NormalDist.from_samples(map(func, G1, G2)))
const = 11
for func in (add, sub, mul, truediv):
print(f'\nTest {func.__name__} with a constant:')
print(func(g1, const))
print(NormalDist.from_samples(map(func, G1, repeat(const))))
const = 19
for func in (add, sub, mul):
print(f'\nTest constant with {func.__name__}:')
print(func(const, g1))
print(NormalDist.from_samples(map(func, repeat(const), G1)))
def assert_close(G1, G2):
assert isclose(G1.mean, G1.mean, rel_tol=0.01), (G1, G2)
assert isclose(G1.stdev, G2.stdev, rel_tol=0.01), (G1, G2)
X = NormalDist(-105, 73)
Y = NormalDist(31, 47)
s = 32.75
n = 100_000
S = NormalDist.from_samples([x + s for x in X.samples(n)])
assert_close(X + s, S)
S = NormalDist.from_samples([x - s for x in X.samples(n)])
assert_close(X - s, S)
S = NormalDist.from_samples([x * s for x in X.samples(n)])
assert_close(X * s, S)
S = NormalDist.from_samples([x / s for x in X.samples(n)])
assert_close(X / s, S)
S = NormalDist.from_samples([x + y for x, y in zip(X.samples(n),
Y.samples(n))])
assert_close(X + Y, S)
S = NormalDist.from_samples([x - y for x, y in zip(X.samples(n),
Y.samples(n))])
assert_close(X - Y, S)
print(doctest.testmod())