Source code for btQuant.econometrics

import numpy as np

def _tCdf(x, df):
    """Student's t cumulative distribution function."""
    a = df / (df + x**2)
    return 1.0 - 0.5 * _betaInc(df / 2, 0.5, a)

def _betaInc(a, b, x):
    """Incomplete beta function approximation."""
    if x <= 0:
        return 0
    if x >= 1:
        return 1
    
    bt = np.exp(a * np.log(x) + b * np.log(1 - x) - _logBeta(a, b))
    
    if x < (a + 1) / (a + b + 2):
        return bt * _betaCf(a, b, x) / a
    else:
        return 1 - bt * _betaCf(b, a, 1 - x) / b

def _betaCf(a, b, x, maxIter=100):
    """Continued fraction for incomplete beta function."""
    qab = a + b
    qap = a + 1
    qam = a - 1
    c = 1.0
    d = 1.0 - qab * x / qap
    
    if abs(d) < 1e-30:
        d = 1e-30
    d = 1.0 / d
    h = d
    
    for m in range(1, maxIter):
        m2 = 2 * m
        aa = m * (b - m) * x / ((qam + m2) * (a + m2))
        d = 1.0 + aa * d
        if abs(d) < 1e-30:
            d = 1e-30
        c = 1.0 + aa / c
        if abs(c) < 1e-30:
            c = 1e-30
        d = 1.0 / d
        h *= d * c
        
        aa = -(a + m) * (qab + m) * x / ((a + m2) * (qap + m2))
        d = 1.0 + aa * d
        if abs(d) < 1e-30:
            d = 1e-30
        c = 1.0 + aa / c
        if abs(c) < 1e-30:
            c = 1e-30
        d = 1.0 / d
        delta = d * c
        h *= delta
        
        if abs(delta - 1.0) < 1e-8:
            break
    
    return h

def _logBeta(a, b):
    """Log beta function."""
    return _logGamma(a) + _logGamma(b) - _logGamma(a + b)

def _logGamma(x):
    """Log gamma function approximation."""
    if x <= 0:
        return np.inf
    
    cof = [76.18009172947146, -86.50532032941677, 24.01409824083091,
           -1.231739572450155, 0.1208650973866179e-2, -0.5395239384953e-5]
    
    y = x
    tmp = x + 5.5
    tmp -= (x + 0.5) * np.log(tmp)
    ser = 1.000000000190015
    
    for c in cof:
        y += 1
        ser += c / y
    
    return -tmp + np.log(2.5066282746310005 * ser / x)

def _chiSqCdf(x, df):
    """Chi-squared cumulative distribution function."""
    return _gammaInc(df / 2, x / 2)

def _gammaInc(a, x):
    """Incomplete gamma function."""
    if x < 0 or a <= 0:
        return 0.0
    
    if x < a + 1:
        ap = a
        delta = 1.0 / a
        sumVal = delta
        for n in range(1, 100):
            ap += 1
            delta *= x / ap
            sumVal += delta
            if delta < sumVal * 1e-10:
                break
        return sumVal * np.exp(-x + a * np.log(x) - _logGamma(a))
    else:
        b = x + 1 - a
        c = 1.0 / 1e-30
        d = 1.0 / b
        h = d
        for i in range(1, 100):
            an = -i * (i - a)
            b += 2.0
            d = an * d + b
            if abs(d) < 1e-30:
                d = 1e-30
            c = b + an / c
            if abs(c) < 1e-30:
                c = 1e-30
            d = 1.0 / d
            delta = d * c
            h *= delta
            if abs(delta - 1.0) < 1e-10:
                break
        return 1.0 - h * np.exp(-x + a * np.log(x) - _logGamma(a))

[docs] def ols(y, X, addConst=True, robust=False, covType='HC3'): """ Ordinary least squares regression with optional robust standard errors. Parameters: y: dependent variable (1D array) X: independent variables (2D array or 1D for single predictor) addConst: add constant term robust: use robust standard errors covType: 'HC0', 'HC1', 'HC2', 'HC3', 'HC4' Returns: dict with coefficients, std_errors, t_stats, p_values, r_squared, etc. """ y = np.asarray(y).flatten() X = np.asarray(X) if X.ndim == 1: X = X.reshape(-1, 1) if addConst: X = np.column_stack([np.ones(len(y)), X]) n, k = X.shape XtXInv = np.linalg.inv(X.T @ X) beta = XtXInv @ X.T @ y residuals = y - X @ beta yHat = X @ beta df = n - k SSR = np.sum((yHat - np.mean(y))**2) SST = np.sum((y - np.mean(y))**2) SSE = np.sum(residuals**2) rSquared = SSR / SST adjRSquared = 1 - (1 - rSquared) * (n - 1) / df if not robust: sigma2 = SSE / df covMatrix = sigma2 * XtXInv stdErrors = np.sqrt(np.diag(covMatrix)) else: if covType == 'HC0': u2 = residuals**2 elif covType == 'HC1': u2 = residuals**2 * (n / df) elif covType == 'HC2': h = np.diag(X @ XtXInv @ X.T) u2 = residuals**2 / (1 - h) elif covType == 'HC3': h = np.diag(X @ XtXInv @ X.T) u2 = residuals**2 / (1 - h)**2 elif covType == 'HC4': h = np.diag(X @ XtXInv @ X.T) delta = np.minimum(4, h / np.mean(h)) u2 = residuals**2 / (1 - h)**delta else: raise ValueError("covType must be HC0, HC1, HC2, HC3, or HC4") Xu2 = X * u2[:, np.newaxis] covMatrix = XtXInv @ (X.T @ Xu2) @ XtXInv stdErrors = np.sqrt(np.diag(covMatrix)) tStats = beta / stdErrors pValues = 2 * (1 - _tCdf(np.abs(tStats), df)) tCrit = 1.96 ciLower = beta - tCrit * stdErrors ciUpper = beta + tCrit * stdErrors fStat = None fPValue = None if addConst and k > 1: rss1 = SSE XRestricted = X[:, 0].reshape(-1, 1) betaRestricted = np.linalg.inv(XRestricted.T @ XRestricted) @ XRestricted.T @ y residualsRestricted = y - XRestricted @ betaRestricted rss0 = np.sum(residualsRestricted**2) fStat = ((rss0 - rss1) / (k - 1)) / (rss1 / df) fPValue = 1 - _chiSqCdf(fStat * (k - 1), k - 1) return { 'coefficients': beta, 'stdErrors': stdErrors, 'tStats': tStats, 'pValues': pValues, 'confIntLower': ciLower, 'confIntUpper': ciUpper, 'residuals': residuals, 'fittedValues': yHat, 'rSquared': rSquared, 'adjRSquared': adjRSquared, 'fStat': fStat, 'fPValue': fPValue, 'nObs': n, 'df': df, 'sse': SSE, 'covMatrix': covMatrix }
[docs] def whiteTest(y, X, addConst=True): """ White's test for heteroskedasticity. Parameters: y: dependent variable X: independent variables addConst: add constant term Returns: dict with testStatistic, pValue, df, conclusion """ results = ols(y, X, addConst=addConst) resid = results['residuals'] residSq = resid**2 XOrig = np.asarray(X) if XOrig.ndim == 1: XOrig = XOrig.reshape(-1, 1) n, k = XOrig.shape XWhite = [XOrig[:, i] for i in range(k)] XWhite.extend([XOrig[:, i]**2 for i in range(k)]) for i in range(k): for j in range(i + 1, k): XWhite.append(XOrig[:, i] * XOrig[:, j]) XWhite = np.column_stack(XWhite) auxResults = ols(residSq, XWhite, addConst=True) testStat = n * auxResults['rSquared'] dfTest = XWhite.shape[1] pValue = 1 - _chiSqCdf(testStat, dfTest) conclusion = "Reject H0: homoskedasticity" if pValue < 0.05 else "Fail to reject H0" return { 'testStatistic': testStat, 'pValue': pValue, 'df': dfTest, 'conclusion': conclusion }
[docs] def breuschPaganTest(y, X, addConst=True): """ Breusch-Pagan test for heteroskedasticity. Parameters: y: dependent variable X: independent variables addConst: add constant term Returns: dict with testStatistic, pValue, df, conclusion """ results = ols(y, X, addConst=addConst) resid = results['residuals'] n = len(resid) sigma2 = np.sum(resid**2) / n residSqNorm = resid**2 / sigma2 bpResults = ols(residSqNorm, X, addConst=addConst) explainedSS = bpResults['rSquared'] * np.sum((residSqNorm - np.mean(residSqNorm))**2) testStat = 0.5 * explainedSS XArr = np.asarray(X) dfTest = XArr.shape[1] if XArr.ndim > 1 else 1 pValue = 1 - _chiSqCdf(testStat, dfTest) conclusion = "Reject H0: homoskedasticity" if pValue < 0.05 else "Fail to reject H0" return { 'testStatistic': testStat, 'pValue': pValue, 'df': dfTest, 'conclusion': conclusion }
[docs] def durbinWatson(residuals): """ Durbin-Watson statistic for autocorrelation. DW ~ 2: no autocorrelation 0 < DW < 2: positive autocorrelation 2 < DW < 4: negative autocorrelation Parameters: residuals: residuals from regression Returns: float: Durbin-Watson statistic """ residuals = np.asarray(residuals) diffSquared = np.sum(np.diff(residuals)**2) residSquared = np.sum(residuals**2) return diffSquared / residSquared
[docs] def ljungBox(residuals, lags=None): """ Ljung-Box test for autocorrelation in residuals. Parameters: residuals: residuals from regression lags: number of lags (default min(10, n//5)) Returns: dict with lags, autocorrelations, qStats, pValues, criticalValues """ residuals = np.asarray(residuals) n = len(residuals) if lags is None: lags = min(10, n // 5) acfValues = [] for k in range(1, lags + 1): numerator = np.sum(residuals[k:] * residuals[:-k]) denominator = np.sum(residuals**2) acfValues.append(numerator / denominator) acfValues = np.array(acfValues) qStats = [] pValues = [] for l in range(1, lags + 1): q = n * (n + 2) * np.sum((acfValues[:l]**2) / (n - np.arange(1, l + 1))) qStats.append(q) pValues.append(1 - _chiSqCdf(q, l)) critVals = [_chiSqInv(0.95, i) for i in range(1, lags + 1)] return { 'lags': list(range(1, lags + 1)), 'autocorrelations': acfValues, 'qStats': qStats, 'pValues': pValues, 'criticalValues': critVals }
def _chiSqInv(p, df): """Inverse chi-squared distribution.""" if p <= 0 or p >= 1: return np.nan x = df for _ in range(50): cdf = _chiSqCdf(x, df) pdf = (x ** (df / 2 - 1) * np.exp(-x / 2)) / (2 ** (df / 2) * np.exp(_logGamma(df / 2))) xNew = x - (cdf - p) / pdf if abs(xNew - x) < 1e-8: return xNew x = xNew return x
[docs] def adfTest(series, lags=None, regression='c', autolag='AIC'): """ Augmented Dickey-Fuller test for unit root. Parameters: series: time series lags: number of lags (auto if None) regression: 'nc' (no constant), 'c' (constant), 'ct' (constant+trend) autolag: 'AIC' or 'BIC' Returns: dict with testStatistic, pValue, criticalValues, conclusion, optimalLag """ series = np.asarray(series) n = len(series) if lags is None: maxLags = int(np.ceil(12 * (n / 100)**(1 / 4))) else: maxLags = lags dy = np.diff(series) y1 = series[:-1] if lags is None and autolag is not None: results = {} for p in range(maxLags + 1): X = [] if regression == 'c': X.append(np.ones(n - 1)) elif regression == 'ct': X.append(np.ones(n - 1)) X.append(np.arange(1, n)) X.append(y1) for i in range(1, p + 1): lag = np.zeros(n - 1) lag[i:] = dy[:-i] X.append(lag) if not X: X = np.zeros((n - 1, 0)) else: X = np.column_stack(X) model = ols(dy, X, addConst=False) k = X.shape[1] ssr = np.sum(model['residuals']**2) if autolag == 'AIC': ic = np.log(ssr / (n - 1)) + 2 * k / (n - 1) else: ic = np.log(ssr / (n - 1)) + k * np.log(n - 1) / (n - 1) results[p] = {'ic': ic, 'model': model, 'X': X} optimalLag = min(results.keys(), key=lambda x: results[x]['ic']) model = results[optimalLag]['model'] else: p = maxLags X = [] if regression == 'c': X.append(np.ones(n - 1)) elif regression == 'ct': X.append(np.ones(n - 1)) X.append(np.arange(1, n)) X.append(y1) for i in range(1, p + 1): lag = np.zeros(n - 1) lag[i:] = dy[:-i] X.append(lag) if not X: X = np.zeros((n - 1, 0)) else: X = np.column_stack(X) model = ols(dy, X, addConst=False) optimalLag = maxLags idx = 0 if regression == 'c': idx = 1 elif regression == 'ct': idx = 2 adfStat = model['tStats'][idx] if regression == 'nc': critVals = {'1%': -2.58, '5%': -1.95, '10%': -1.62} elif regression == 'c': critVals = {'1%': -3.43, '5%': -2.86, '10%': -2.57} else: critVals = {'1%': -3.96, '5%': -3.41, '10%': -3.13} pValue = 0.05 conclusion = "Reject H0: unit root" if adfStat < critVals['5%'] else "Fail to reject H0" return { 'testStatistic': adfStat, 'pValue': pValue, 'criticalValues': critVals, 'conclusion': conclusion, 'optimalLag': optimalLag }
[docs] def kpssTest(series, lags=None, regression='c'): """ KPSS test for stationarity. Parameters: series: time series lags: number of lags (auto if None) regression: 'c' (constant) or 'ct' (constant+trend) Returns: dict with testStatistic, pValue, criticalValues, conclusion, lags """ series = np.asarray(series) n = len(series) if lags is None: lags = int(np.ceil(12 * (n / 100)**(1 / 4))) if regression == 'c': resid = series - np.mean(series) else: t = np.arange(1, n + 1) X = np.column_stack([np.ones(n), t]) beta = np.linalg.lstsq(X, series, rcond=None)[0] resid = series - X @ beta s = np.cumsum(resid) gamma0 = np.sum(resid**2) / n autoCov = np.zeros(lags + 1) autoCov[0] = gamma0 for l in range(1, lags + 1): autoCov[l] = np.sum(resid[l:] * resid[:-l]) / n w = 1 - np.arange(1, lags + 1) / (lags + 1) s2 = gamma0 + 2 * np.sum(w * autoCov[1:]) kpssStat = np.sum(s**2) / (n**2 * s2) if regression == 'c': critVals = {'1%': 0.739, '5%': 0.463, '10%': 0.347} else: critVals = {'1%': 0.216, '5%': 0.146, '10%': 0.119} pValue = 0.05 conclusion = "Reject H0: stationarity" if kpssStat > critVals['5%'] else "Fail to reject H0" return { 'testStatistic': kpssStat, 'pValue': pValue, 'criticalValues': critVals, 'conclusion': conclusion, 'lags': lags }
[docs] def grangerCausality(y, x, maxLag=4): """ Granger causality test: does x Granger-cause y? Parameters: y: dependent variable (time series) x: independent variable (time series) maxLag: maximum lag to test Returns: dict with lags, fStats, pValues, conclusion """ y = np.asarray(y) x = np.asarray(x) n = len(y) fStats = [] pValues = [] for lag in range(1, maxLag + 1): yLagged = [] xLagged = [] for i in range(1, lag + 1): yLagged.append(y[lag - i:n - i]) xLagged.append(x[lag - i:n - i]) yTrain = y[lag:] XRestricted = np.column_stack(yLagged) XUnrestricted = np.column_stack(yLagged + xLagged) modelRestricted = ols(yTrain, XRestricted, addConst=True) modelUnrestricted = ols(yTrain, XUnrestricted, addConst=True) rss1 = modelRestricted['sse'] rss2 = modelUnrestricted['sse'] dfNum = lag dfDenom = n - lag - 2 * lag - 1 if dfDenom > 0 and rss2 > 0: fStat = ((rss1 - rss2) / dfNum) / (rss2 / dfDenom) pValue = 1 - _chiSqCdf(fStat * dfNum, dfNum) else: fStat = np.nan pValue = np.nan fStats.append(fStat) pValues.append(pValue) minPValue = np.nanmin(pValues) if len(pValues) > 0 else 1.0 conclusion = "x Granger-causes y" if minPValue < 0.05 else "No Granger causality" return { 'lags': list(range(1, maxLag + 1)), 'fStats': fStats, 'pValues': pValues, 'conclusion': conclusion }