Source code for btQuant.distributions

import numpy as np

def _normCdf(x):
    """Standard normal CDF."""
    return 0.5 * (1.0 + np.tanh(x / np.sqrt(2.0) * 0.7978845608))

def _normPdf(x):
    """Standard normal PDF."""
    return np.exp(-0.5 * x**2) / np.sqrt(2 * np.pi)

def _normPpf(p):
    """Standard normal quantile function."""
    if p <= 0 or p >= 1:
        return np.nan
    
    if p < 0.5:
        sign = -1
        p = 1 - p
    else:
        sign = 1
    
    t = np.sqrt(-2 * np.log(1 - p))
    c0, c1, c2 = 2.515517, 0.802853, 0.010328
    d1, d2, d3 = 1.432788, 0.189269, 0.001308
    x = t - (c0 + c1 * t + c2 * t**2) / (1 + d1 * t + d2 * t**2 + d3 * t**3)
    
    return sign * x

def _chiSqCdf(x, df):
    """Chi-squared CDF approximation."""
    if x <= 0:
        return 0.0
    if df == 1:
        return 2 * _normCdf(np.sqrt(x)) - 1
    return _gammaInc(df / 2, x / 2)

def _gammaInc(a, x):
    """Incomplete gamma function."""
    if x < 0 or a <= 0:
        return 0.0
    
    if x < a + 1:
        ap, delta, sumVal = a, 1.0 / a, 1.0 / a
        for n in range(1, 100):
            ap += 1
            delta *= x / ap
            sumVal += delta
            if delta < sumVal * 1e-10:
                break
        return sumVal * np.exp(-x + a * np.log(x) - _logGamma(a))
    else:
        b, c, d, h = x + 1 - a, 1.0 / 1e-30, 1.0 / b, 1.0 / b
        for i in range(1, 100):
            an = -i * (i - a)
            b += 2.0
            d = an * d + b
            if abs(d) < 1e-30:
                d = 1e-30
            c = b + an / c
            if abs(c) < 1e-30:
                c = 1e-30
            d = 1.0 / d
            delta = d * c
            h *= delta
            if abs(delta - 1.0) < 1e-10:
                break
        return 1.0 - h * np.exp(-x + a * np.log(x) - _logGamma(a))

def _logGamma(x):
    """Log gamma function."""
    if x <= 0:
        return np.inf
    
    cof = [76.18009172947146, -86.50532032941677, 24.01409824083091,
           -1.231739572450155, 0.1208650973866179e-2, -0.5395239384953e-5]
    
    y = x
    tmp = x + 5.5
    tmp -= (x + 0.5) * np.log(tmp)
    ser = 1.000000000190015
    
    for c in cof:
        y += 1
        ser += c / y
    
    return -tmp + np.log(2.5066282746310005 * ser / x)

[docs] def fitNormal(data): """ Fit normal distribution. Parameters: data: array of observations Returns: dict with mu, sigma, logLikelihood """ data = np.asarray(data) mu = np.mean(data) sigma = np.std(data, ddof=1) logLik = -0.5 * len(data) * np.log(2 * np.pi * sigma**2) - np.sum((data - mu)**2) / (2 * sigma**2) return {'mu': mu, 'sigma': sigma, 'logLikelihood': logLik}
[docs] def fitLognormal(data): """ Fit lognormal distribution. Parameters: data: array of positive observations Returns: dict with mu, sigma (of log), logLikelihood """ data = np.asarray(data) data = data[data > 0] logData = np.log(data) mu = np.mean(logData) sigma = np.std(logData, ddof=1) logLik = -0.5 * len(data) * np.log(2 * np.pi * sigma**2) - np.sum((logData - mu)**2) / (2 * sigma**2) logLik -= np.sum(np.log(data)) return {'mu': mu, 'sigma': sigma, 'logLikelihood': logLik}
[docs] def fitExponential(data): """ Fit exponential distribution. Parameters: data: array of positive observations Returns: dict with lambda (rate), logLikelihood """ data = np.asarray(data) data = data[data > 0] lambdaParam = 1.0 / np.mean(data) logLik = len(data) * np.log(lambdaParam) - lambdaParam * np.sum(data) return {'lambda': lambdaParam, 'logLikelihood': logLik}
[docs] def fitGamma(data, maxIter=100): """ Fit gamma distribution using method of moments. Parameters: data: array of positive observations maxIter: maximum iterations Returns: dict with alpha (shape), beta (rate), logLikelihood """ data = np.asarray(data) data = data[data > 0] meanData = np.mean(data) varData = np.var(data, ddof=1) alpha = meanData**2 / varData beta = meanData / varData logLik = len(data) * (alpha * np.log(beta) - _logGamma(alpha)) + (alpha - 1) * np.sum(np.log(data)) - beta * np.sum(data) return {'alpha': alpha, 'beta': beta, 'logLikelihood': logLik}
[docs] def fitBeta(data, maxIter=100): """ Fit beta distribution using method of moments. Parameters: data: array of observations in (0, 1) maxIter: maximum iterations Returns: dict with alpha, beta parameters, logLikelihood """ data = np.asarray(data) data = data[(data > 0) & (data < 1)] meanData = np.mean(data) varData = np.var(data, ddof=1) commonTerm = meanData * (1 - meanData) / varData - 1 alpha = meanData * commonTerm beta = (1 - meanData) * commonTerm alpha = max(alpha, 0.1) beta = max(beta, 0.1) logLik = len(data) * (_logGamma(alpha + beta) - _logGamma(alpha) - _logGamma(beta)) logLik += (alpha - 1) * np.sum(np.log(data)) + (beta - 1) * np.sum(np.log(1 - data)) return {'alpha': alpha, 'beta': beta, 'logLikelihood': logLik}
[docs] def fitT(data, maxIter=50): """ Fit Student's t distribution. Parameters: data: array of observations maxIter: maximum iterations Returns: dict with df (degrees of freedom), mu, sigma, logLikelihood """ data = np.asarray(data) mu = np.median(data) sigma = np.std(data, ddof=1) kurt = _kurtosis(data) if kurt > 0: df = 6.0 / kurt + 4 else: df = 10.0 df = max(df, 2.5) logLik = 0.0 return {'df': df, 'mu': mu, 'sigma': sigma, 'logLikelihood': logLik}
def _kurtosis(x): """Excess kurtosis.""" n = len(x) mean = np.mean(x) m2 = np.sum((x - mean)**2) / n m4 = np.sum((x - mean)**4) / n return m4 / (m2**2 + 1e-10) - 3 def _skewness(x): """Skewness.""" n = len(x) mean = np.mean(x) m2 = np.sum((x - mean)**2) / n m3 = np.sum((x - mean)**3) / n return m3 / (m2**1.5 + 1e-10)
[docs] def moments(data): """ Calculate distribution moments. Parameters: data: array of observations Returns: dict with mean, variance, skewness, kurtosis """ data = np.asarray(data) return { 'mean': np.mean(data), 'variance': np.var(data, ddof=1), 'skewness': _skewness(data), 'kurtosis': _kurtosis(data) }
[docs] def ksTest(data, distName='normal', params=None): """ Kolmogorov-Smirnov test for distribution fit. Parameters: data: array of observations distName: 'normal', 'lognormal', 'exponential' params: dict of distribution parameters (if None, estimated) Returns: dict with statistic, pValue (approximate) """ data = np.asarray(data) n = len(data) sortedData = np.sort(data) if params is None: if distName == 'normal': params = fitNormal(data) elif distName == 'lognormal': params = fitLognormal(data) elif distName == 'exponential': params = fitExponential(data) else: raise ValueError("Unknown distribution") empiricalCdf = np.arange(1, n + 1) / n if distName == 'normal': theoreticalCdf = _normCdf((sortedData - params['mu']) / params['sigma']) elif distName == 'lognormal': theoreticalCdf = _normCdf((np.log(sortedData) - params['mu']) / params['sigma']) elif distName == 'exponential': theoreticalCdf = 1 - np.exp(-params['lambda'] * sortedData) else: raise ValueError("Unknown distribution") dPlus = np.max(empiricalCdf - theoreticalCdf) dMinus = np.max(theoreticalCdf - (empiricalCdf - 1 / n)) statistic = max(dPlus, dMinus) pValue = np.exp(-2 * n * statistic**2) return {'statistic': statistic, 'pValue': pValue}
[docs] def adTest(data, distName='normal'): """ Anderson-Darling test for normality. Parameters: data: array of observations distName: currently only 'normal' supported Returns: dict with statistic, critical values """ data = np.asarray(data) n = len(data) if distName != 'normal': raise ValueError("Only normal distribution currently supported") mean = np.mean(data) std = np.std(data, ddof=1) z = (data - mean) / std z = np.sort(z) i = np.arange(1, n + 1) cdf = _normCdf(z) S = -n - np.sum((2 * i - 1) * (np.log(cdf) + np.log(1 - cdf[::-1]))) / n criticalValues = { '15%': 0.576, '10%': 0.656, '5%': 0.787, '2.5%': 0.918, '1%': 1.092 } return {'statistic': S, 'criticalValues': criticalValues}
[docs] def klDivergence(p, q): """ Kullback-Leibler divergence. Parameters: p: true distribution (probabilities) q: approximate distribution (probabilities) Returns: KL divergence """ p = np.asarray(p) q = np.asarray(q) p = p / np.sum(p) q = q / np.sum(q) p = np.clip(p, 1e-12, 1) q = np.clip(q, 1e-12, 1) return np.sum(p * np.log(p / q))
[docs] def jsDivergence(p, q): """ Jensen-Shannon divergence. Parameters: p: first distribution q: second distribution Returns: JS divergence """ p = np.asarray(p) q = np.asarray(q) p = p / np.sum(p) q = q / np.sum(q) m = 0.5 * (p + q) return 0.5 * (klDivergence(p, m) + klDivergence(q, m))
[docs] def fitMixture(data, nComponents=2, maxIter=100): """ Fit Gaussian mixture model using EM algorithm. Parameters: data: array of observations nComponents: number of mixture components maxIter: maximum iterations Returns: dict with means, sigmas, weights """ data = np.asarray(data).reshape(-1, 1) n = len(data) means = np.linspace(np.min(data), np.max(data), nComponents).reshape(-1, 1) sigmas = np.ones((nComponents, 1)) * np.std(data) weights = np.ones(nComponents) / nComponents for iteration in range(maxIter): responsibilities = np.zeros((n, nComponents)) for k in range(nComponents): diff = data - means[k] responsibilities[:, k] = (weights[k] * np.exp(-0.5 * (diff / sigmas[k])**2).flatten() / (sigmas[k] * np.sqrt(2 * np.pi))) responsibilities /= (np.sum(responsibilities, axis=1, keepdims=True) + 1e-10) nK = np.sum(responsibilities, axis=0) for k in range(nComponents): means[k] = np.sum(responsibilities[:, k:k+1] * data, axis=0) / (nK[k] + 1e-10) diff = data - means[k] sigmas[k] = np.sqrt(np.sum(responsibilities[:, k:k+1] * diff**2, axis=0) / (nK[k] + 1e-10)) weights[k] = nK[k] / n return { 'means': means.flatten(), 'sigmas': sigmas.flatten(), 'weights': weights }
[docs] def quantile(data, q): """ Calculate quantile. Parameters: data: array of observations q: quantile level (0 to 1) Returns: quantile value """ data = np.asarray(data) return np.percentile(data, q * 100)
[docs] def qqPlot(data, distName='normal'): """ Generate Q-Q plot data. Parameters: data: array of observations distName: reference distribution Returns: dict with theoretical and empirical quantiles """ data = np.asarray(data) n = len(data) sortedData = np.sort(data) probabilities = (np.arange(1, n + 1) - 0.5) / n if distName == 'normal': mean = np.mean(data) std = np.std(data, ddof=1) theoreticalQuantiles = mean + std * np.array([_normPpf(p) for p in probabilities]) else: raise ValueError("Only normal distribution currently supported") return { 'theoretical': theoreticalQuantiles, 'empirical': sortedData }