Source code for runstats.core

"""Python RunStats

Compute Statistics and Regression in a single pass.

"""

from __future__ import division


[docs]class Statistics(object): """Compute statistics in a single pass. Computes the minimum, maximum, mean, variance, standard deviation, skewness, and kurtosis. Statistics objects may also be added together and copied. Based entirely on the C++ code by John D Cook at http://www.johndcook.com/skewness_kurtosis.html """
[docs] def __init__(self, iterable=()): """Initialize Statistics object. Iterates optional parameter `iterable` and pushes each value into the statistics summary. """ self.clear() for value in iterable: self.push(value)
[docs] def clear(self): """Clear Statistics object.""" self._count = self._eta = self._rho = self._tau = self._phi = 0.0 self._min = self._max = float('nan')
[docs] def __eq__(self, that): return self.get_state() == that.get_state()
[docs] def __ne__(self, that): return self.get_state() != that.get_state()
[docs] def get_state(self): """Get internal state.""" return ( self._count, self._eta, self._rho, self._tau, self._phi, self._min, self._max )
[docs] def set_state(self, state): """Set internal state.""" ( self._count, self._eta, self._rho, self._tau, self._phi, self._min, self._max ) = state
[docs] @classmethod def fromstate(cls, state): """Return Statistics object from state.""" stats = cls() stats.set_state(state) return stats
[docs] def __reduce__(self): return make_statistics, (self.get_state(),)
[docs] def copy(self, _=None): """Copy Statistics object.""" return self.fromstate(self.get_state())
__copy__ = copy __deepcopy__ = copy
[docs] def __len__(self): """Number of values that have been pushed.""" return int(self._count)
[docs] def push(self, value): """Add `value` to the Statistics summary.""" value = float(value) if self._count == 0.0: self._min = value self._max = value else: self._min = min(self._min, value) self._max = max(self._max, value) delta = value - self._eta delta_n = delta / (self._count + 1) delta_n2 = delta_n * delta_n term = delta * delta_n * self._count self._count += 1 self._eta += delta_n self._phi += ( term * delta_n2 * (self._count ** 2 - 3 * self._count + 3) + 6 * delta_n2 * self._rho - 4 * delta_n * self._tau ) self._tau += ( term * delta_n * (self._count - 2) - 3 * delta_n * self._rho ) self._rho += term
[docs] def minimum(self): """Minimum of values.""" return self._min
[docs] def maximum(self): """Maximum of values.""" return self._max
[docs] def mean(self): """Mean of values.""" return self._eta
[docs] def variance(self, ddof=1.0): """Variance of values (with `ddof` degrees of freedom).""" return self._rho / (self._count - ddof)
[docs] def stddev(self, ddof=1.0): """Standard deviation of values (with `ddof` degrees of freedom).""" return self.variance(ddof) ** 0.5
[docs] def skewness(self): """Skewness of values.""" return (self._count ** 0.5) * self._tau / pow(self._rho, 1.5)
[docs] def kurtosis(self): """Kurtosis of values.""" return self._count * self._phi / (self._rho * self._rho) - 3.0
[docs] def __add__(self, that): """Add two Statistics objects together.""" sigma = self.copy() sigma += that return sigma
[docs] def __iadd__(self, that): """Add another Statistics object to this one.""" sum_count = self._count + that._count if sum_count == 0: return self delta = that._eta - self._eta delta2 = delta ** 2 delta3 = delta ** 3 delta4 = delta ** 4 sum_eta = ( (self._count * self._eta + that._count * that._eta) / sum_count ) sum_rho = ( self._rho + that._rho + delta2 * self._count * that._count / sum_count ) sum_tau = ( self._tau + that._tau + delta3 * self._count * that._count * (self._count - that._count) / (sum_count ** 2) + 3.0 * delta * (self._count * that._rho - that._count * self._rho) / sum_count ) sum_phi = ( self._phi + that._phi + delta4 * self._count * that._count * (self._count ** 2 - self._count * that._count + that._count ** 2) / (sum_count ** 3) + 6.0 * delta2 * ( self._count * self._count * that._rho + that._count * that._count * self._rho ) / (sum_count ** 2) + 4.0 * delta * (self._count * that._tau - that._count * self._tau) / sum_count ) if self._count == 0.0: self._min = that._min self._max = that._max elif that._count != 0.0: self._min = min(self._min, that._min) self._max = max(self._max, that._max) self._count = sum_count self._eta = sum_eta self._rho = sum_rho self._tau = sum_tau self._phi = sum_phi return self
[docs] def __mul__(self, that): """Multiply by a scalar to change Statistics weighting.""" sigma = self.copy() sigma *= that return sigma
__rmul__ = __mul__
[docs] def __imul__(self, that): """Multiply by a scalar to change Statistics weighting in-place.""" that = float(that) self._count *= that self._rho *= that self._tau *= that self._phi *= that return self
def make_statistics(state): """Make Statistics object from state.""" return Statistics.fromstate(state)
[docs]class Regression(object): """ Compute simple linear regression in a single pass. Computes the slope, intercept, and correlation. Regression objects may also be added together and copied. Based entirely on the C++ code by John D Cook at http://www.johndcook.com/running_regression.html """
[docs] def __init__(self, iterable=()): """Initialize Regression object. Iterates optional parameter `iterable` and pushes each pair into the regression summary. """ self._xstats = Statistics() self._ystats = Statistics() self.clear() for xcoord, ycoord in iterable: self.push(xcoord, ycoord)
[docs] def __eq__(self, that): return self.get_state() == that.get_state()
[docs] def __ne__(self, that): return self.get_state() != that.get_state()
[docs] def clear(self): """Clear Regression object.""" self._xstats.clear() self._ystats.clear() self._count = self._sxy = 0.0
[docs] def get_state(self): """Get internal state.""" return ( self._count, self._sxy, self._xstats.get_state(), self._ystats.get_state() )
[docs] def set_state(self, state): """Set internal state.""" count, sxy, xstats, ystats = state self._count = count self._sxy = sxy self._xstats.set_state(xstats) self._ystats.set_state(ystats)
[docs] @classmethod def fromstate(cls, state): """Return Regression object from state.""" regr = cls() regr.set_state(state) return regr
[docs] def __reduce__(self): return make_regression, (self.get_state(),)
[docs] def copy(self, _=None): """Copy Regression object.""" return self.fromstate(self.get_state())
__copy__ = copy __deepcopy__ = copy
[docs] def __len__(self): """Number of values that have been pushed.""" return int(self._count)
[docs] def push(self, xcoord, ycoord): """Add a pair `(x, y)` to the Regression summary.""" self._sxy += ( (self._xstats.mean() - xcoord) * (self._ystats.mean() - ycoord) * self._count / (self._count + 1) ) self._xstats.push(xcoord) self._ystats.push(ycoord) self._count += 1
[docs] def slope(self, ddof=1.0): """Slope of values (with `ddof` degrees of freedom).""" sxx = self._xstats.variance(ddof) * (self._count - ddof) return self._sxy / sxx
[docs] def intercept(self, ddof=1.0): """Intercept of values (with `ddof` degrees of freedom).""" return self._ystats.mean() - self.slope(ddof) * self._xstats.mean()
[docs] def correlation(self, ddof=1.0): """Correlation of values (with `ddof` degrees of freedom).""" term = self._xstats.stddev(ddof) * self._ystats.stddev(ddof) return self._sxy / ((self._count - ddof) * term)
[docs] def __add__(self, that): """Add two Regression objects together.""" sigma = self.copy() sigma += that return sigma
[docs] def __iadd__(self, that): """Add another Regression object to this one.""" sum_count = self._count + that._count if sum_count == 0: return self sum_xstats = self._xstats + that._xstats sum_ystats = self._ystats + that._ystats deltax = that._xstats.mean() - self._xstats.mean() deltay = that._ystats.mean() - self._ystats.mean() sum_sxy = ( self._sxy + that._sxy + self._count * that._count * deltax * deltay / sum_count ) self._count = sum_count self._xstats = sum_xstats self._ystats = sum_ystats self._sxy = sum_sxy return self
def make_regression(state): """Make Regression object from state.""" return Regression.fromstate(state)