Source code for btQuant.fit

import numpy as np

[docs] def fitGbm(prices, dt=1/252): """ Fit Geometric Brownian Motion to price data. Parameters: prices: array of prices dt: time step (default 1/252 for daily) Returns: dict with mu (drift), sigma (volatility) """ prices = np.asarray(prices, dtype=float) logReturns = np.diff(np.log(prices)) variance = np.var(logReturns, ddof=1) mu = np.mean(logReturns) / dt + 0.5 * variance / dt sigma = np.sqrt(variance / dt) return {'mu': mu, 'sigma': sigma}
[docs] def fitOu(spread, dt=1/252): """ Fit Ornstein-Uhlenbeck process to spread data. Parameters: spread: array of spread/price data dt: time step Returns: dict with theta (mean reversion), mu (long-term mean), sigma, halfLife """ spread = np.asarray(spread, dtype=float) n = len(spread) - 1 x = spread[:-1] y = spread[1:] Sx = np.sum(x) Sy = np.sum(y) Sxx = np.sum(x * x) Syy = np.sum(y * y) Sxy = np.sum(x * y) denom = n * (Sxx - Sxy) - (Sx * Sx - Sx * Sy) mu = (Sy * Sxx - Sx * Sxy) / denom numerator = Sxy - mu * Sx - mu * Sy + n * mu * mu denominator = Sxx - 2 * mu * Sx + n * mu * mu theta = -np.log(numerator / denominator) a = np.exp(-theta) sigmah2 = (Syy - 2 * a * Sxy + a * a * Sxx - 2 * mu * (1 - a) * (Sy - a * Sx) + n * mu * mu * (1 - a) * (1 - a)) / n sigma = np.sqrt(sigmah2 * 2 * theta / (1 - a * a)) halfLife = np.log(2) / theta / dt return { 'theta': theta / dt, 'mu': mu, 'sigma': sigma * np.sqrt(1 / dt), 'halfLife': halfLife }
[docs] def fitLevyOu(spread, jumpDetectionThreshold=0.4, dt=1/252): """ Fit Lévy OU (jump-diffusion OU) model to spread data. Parameters: spread: array of spread/price data jumpDetectionThreshold: Bayesian jump detection threshold dt: time step Returns: dict with theta, mu, sigma, halfLife, jumpLambda, jumpMu, jumpSigma """ spread = np.asarray(spread, dtype=float) diffs = np.diff(spread) n = len(diffs) mad = np.median(np.abs(diffs - np.median(diffs))) sigmaEst = 1.4826 * mad def normPdf(x, mean, std): return np.exp(-0.5 * ((x - mean) / std)**2) / (std * np.sqrt(2 * np.pi)) pPrior = 0.01 pNoJump = normPdf(diffs, 0, sigmaEst) pJump = normPdf(diffs, diffs, sigmaEst) likelihoodRatio = (pPrior * pJump) / ((1 - pPrior) * pNoJump + pPrior * pJump) jumpMask = likelihoodRatio > jumpDetectionThreshold jumpIndices = np.where(jumpMask)[0] + 1 jumpSizes = diffs[jumpMask] mask = np.ones(len(spread), dtype=bool) if len(jumpIndices) > 0: mask[jumpIndices] = False afterIndices = jumpIndices + 1 afterIndices = afterIndices[afterIndices < len(mask)] if len(afterIndices) > 0: mask[afterIndices] = False cleanSpread = spread[mask] if len(cleanSpread) < 10: cleanSpread = spread unconditionalStd = np.std(cleanSpread, ddof=1) if len(cleanSpread) > 1: lag1Corr = np.corrcoef(cleanSpread[:-1], cleanSpread[1:])[0, 1] lag1Corr = np.clip(lag1Corr, 0.01, 0.99) thetaEst = -np.log(lag1Corr) / dt thetaEst = np.clip(thetaEst, 0.1, 1 / dt) else: thetaEst = 1.0 sigmaEst = unconditionalStd * np.sqrt(2 * thetaEst) muEst = np.mean(cleanSpread) def negLogLikelihood(params): theta, mu, sigma = params X = cleanSpread[:-1] Y = cleanSpread[1:] drift = X + (mu - X) * (1 - np.exp(-theta * dt)) variance = np.maximum(sigma**2 * (1 - np.exp(-2 * theta * dt)) / (2 * theta), 1e-10) return -np.sum(-0.5 * np.log(2 * np.pi * variance) - (Y - drift)**2 / (2 * variance)) params = np.array([thetaEst, muEst, sigmaEst]) for iteration in range(50): ll = negLogLikelihood(params) eps = 1e-8 grad = np.zeros(3) for i in range(3): paramsPlus = params.copy() paramsPlus[i] += eps grad[i] = (negLogLikelihood(paramsPlus) - ll) / eps params -= 0.001 * grad params[0] = np.clip(params[0], 1e-4, 1 / dt) params[2] = np.clip(params[2], 1e-4, 2.0 * sigmaEst) theta, mu, sigma = params halfLife = np.log(2) / theta / dt jumpLambda = len(jumpIndices) / len(spread) jumpMu = np.mean(jumpSizes) if len(jumpSizes) > 0 else 0.0 jumpSigma = np.std(jumpSizes, ddof=1) if len(jumpSizes) > 1 else 0.0 return { 'theta': theta, 'mu': mu, 'sigma': sigma, 'halfLife': halfLife, 'jumpLambda': jumpLambda, 'jumpMu': jumpMu, 'jumpSigma': jumpSigma }
[docs] def fitAr1(series): """ Fit AR(1) model. Parameters: series: time series array Returns: dict with phi (AR coefficient), intercept, sigma2 (error variance) """ series = np.asarray(series, dtype=float) meanSeries = np.mean(series) seriesDemeaned = series - meanSeries phi = np.dot(seriesDemeaned[:-1], seriesDemeaned[1:]) / np.dot(seriesDemeaned[:-1], seriesDemeaned[:-1]) residuals = seriesDemeaned[1:] - phi * seriesDemeaned[:-1] sigma2 = np.var(residuals, ddof=1) intercept = meanSeries * (1 - phi) return {'phi': phi, 'intercept': intercept, 'sigma2': sigma2}
[docs] def fitArma(series, p=1, q=1, maxIter=100): """ Fit ARMA(p,q) model using approximate method. Parameters: series: time series p: AR order q: MA order maxIter: maximum iterations Returns: dict with arCoefs, maCoefs, sigma2 """ series = np.asarray(series, dtype=float) series = series - np.mean(series) n = len(series) arCoefs = np.zeros(p) for i in range(p): if i < n - 1: arCoefs[i] = np.corrcoef(series[:-i-1], series[i+1:])[0, 1] * 0.5 maCoefs = np.zeros(q) residuals = series.copy() for iteration in range(min(maxIter, 10)): for i in range(p): if i < n - 1: residuals[i+1:] -= arCoefs[i] * series[:n-i-1] sigma2 = np.var(residuals, ddof=1) return {'arCoefs': arCoefs, 'maCoefs': maCoefs, 'sigma2': sigma2}
[docs] def fitGarch(series, p=1, q=1, maxIter=50): """ Fit GARCH(p,q) model. Parameters: series: returns series p: ARCH order q: GARCH order maxIter: maximum iterations Returns: dict with omega, alphas, betas, aic, bic """ series = np.asarray(series, dtype=float) n = len(series) omega = np.var(series, ddof=1) * 0.1 alphas = np.ones(p) * 0.1 / p betas = np.ones(q) * 0.8 / q params = np.concatenate([[omega], alphas, betas]) def negLogLikelihood(params): omega = params[0] alphas = params[1:p+1] betas = params[p+1:] sigma2 = np.zeros(n) sigma2[:max(p, q)] = np.var(series, ddof=1) epsilon2 = series * series for t in range(max(p, q), n): sigma2[t] = omega for i in range(p): if t - i - 1 >= 0: sigma2[t] += alphas[i] * epsilon2[t - i - 1] for j in range(q): if t - j - 1 >= 0: sigma2[t] += betas[j] * sigma2[t - j - 1] sigma2 = np.maximum(sigma2, 1e-6) logLik = -0.5 * np.sum(np.log(2 * np.pi * sigma2) + epsilon2 / sigma2) return -logLik for iteration in range(maxIter): ll = negLogLikelihood(params) eps = 1e-8 grad = np.zeros(len(params)) for i in range(len(params)): paramsPlus = params.copy() paramsPlus[i] += eps grad[i] = (negLogLikelihood(paramsPlus) - ll) / eps params -= 0.001 * grad params[0] = max(params[0], 1e-6) params[1:] = np.clip(params[1:], 0, 0.99) ll = -negLogLikelihood(params) k = len(params) aic = 2 * k - 2 * ll bic = np.log(n) * k - 2 * ll omega = params[0] alphas = params[1:p+1] betas = params[p+1:] paramsDict = {'omega': omega} for i in range(p): paramsDict[f'alpha{i+1}'] = alphas[i] for j in range(q): paramsDict[f'beta{j+1}'] = betas[j] return {'params': paramsDict, 'aic': aic, 'bic': bic}
[docs] def fitHeston(prices, dt=1/252): """ Fit Heston stochastic volatility model. Parameters: prices: price series dt: time step Returns: dict with mu, kappa, theta, sigmaV, rho, v0 """ prices = np.asarray(prices, dtype=float) logReturns = np.diff(np.log(prices)) realizedVar = logReturns**2 / dt kappa = 2 * (1 - np.corrcoef(realizedVar[:-1], realizedVar[1:])[0, 1]) / dt theta = np.mean(realizedVar) sigmaV = np.std(np.diff(realizedVar), ddof=1) * np.sqrt(1 / dt) rho = np.corrcoef(logReturns[:-1], np.diff(realizedVar))[0, 1] mu = np.mean(logReturns) / dt return { 'mu': mu, 'kappa': kappa, 'theta': theta, 'sigmaV': sigmaV, 'rho': rho, 'v0': realizedVar[0] }
[docs] def fitCir(rates, dt=1/252): """ Fit Cox-Ingersoll-Ross model to interest rate data. Parameters: rates: interest rate series dt: time step Returns: dict with kappa, theta, sigma """ rates = np.asarray(rates, dtype=float) rates = np.maximum(rates, 1e-6) x = rates[:-1] y = rates[1:] n = len(x) Sx = np.sum(x) Sy = np.sum(y) Sxx = np.sum(x * x) Sxy = np.sum(x * y) denom = n * Sxx - Sx * Sx a = (Sy * Sxx - Sx * Sxy) / denom b = (n * Sxy - Sx * Sy) / denom theta = a / (1 - b) kappa = -np.log(b) / dt residuals = y - a - b * x sqrtX = np.sqrt(x) sigma = np.std(residuals / sqrtX, ddof=1) * np.sqrt(1 / dt) return {'kappa': kappa, 'theta': theta, 'sigma': sigma}
[docs] def fitVasicek(rates, dt=1/252): """ Fit Vasicek model to interest rate data. Parameters: rates: interest rate series dt: time step Returns: dict with kappa, theta, sigma """ rates = np.asarray(rates, dtype=float) x = rates[:-1] y = rates[1:] n = len(x) Sx = np.sum(x) Sy = np.sum(y) Sxx = np.sum(x * x) Sxy = np.sum(x * y) denom = n * Sxx - Sx * Sx a = (Sy * Sxx - Sx * Sxy) / denom b = (n * Sxy - Sx * Sy) / denom theta = a / (1 - b) kappa = -np.log(b) / dt residuals = y - a - b * x sigma = np.std(residuals, ddof=1) * np.sqrt(1 / dt) return {'kappa': kappa, 'theta': theta, 'sigma': sigma}
[docs] def fitJumpDiffusion(prices, dt=1/252, threshold=3.0): """ Fit jump-diffusion model by separating jumps from diffusion. Parameters: prices: price series dt: time step threshold: jump detection threshold (std devs) Returns: dict with mu, sigma, jumpLambda, jumpMu, jumpSigma """ prices = np.asarray(prices, dtype=float) logReturns = np.diff(np.log(prices)) medianReturn = np.median(logReturns) mad = np.median(np.abs(logReturns - medianReturn)) robustStd = 1.4826 * mad jumpMask = np.abs(logReturns - medianReturn) > threshold * robustStd normalReturns = logReturns[~jumpMask] jumpReturns = logReturns[jumpMask] if len(normalReturns) > 0: mu = np.mean(normalReturns) / dt sigma = np.std(normalReturns, ddof=1) / np.sqrt(dt) else: mu = 0.0 sigma = 0.1 jumpLambda = len(jumpReturns) / len(logReturns) / dt if len(jumpReturns) > 0: jumpMu = np.mean(jumpReturns) jumpSigma = np.std(jumpReturns, ddof=1) else: jumpMu = 0.0 jumpSigma = 0.0 return { 'mu': mu, 'sigma': sigma, 'jumpLambda': jumpLambda, 'jumpMu': jumpMu, 'jumpSigma': jumpSigma }
[docs] def fitCopula(data1, data2, copulaType='gaussian'): """ Fit copula to bivariate data. Parameters: data1: first variable data2: second variable copulaType: 'gaussian' (only type supported) Returns: dict with rho (correlation parameter) """ data1 = np.asarray(data1, dtype=float) data2 = np.asarray(data2, dtype=float) u1 = (np.argsort(np.argsort(data1)) + 1) / (len(data1) + 1) u2 = (np.argsort(np.argsort(data2)) + 1) / (len(data2) + 1) def normPpf(p): if p <= 0 or p >= 1: return 0 if p < 0.5: sign = -1 p = 1 - p else: sign = 1 t = np.sqrt(-2 * np.log(1 - p)) x = t - (2.515517 + 0.802853 * t + 0.010328 * t**2) / (1 + 1.432788 * t + 0.189269 * t**2 + 0.001308 * t**3) return sign * x z1 = np.array([normPpf(u) for u in u1]) z2 = np.array([normPpf(u) for u in u2]) rho = np.corrcoef(z1, z2)[0, 1] return {'rho': rho}
[docs] def fitDistributions(data, distributions=None): """ Fit multiple distributions and rank by AIC. Parameters: data: array of observations distributions: list of distribution names (None = all common) Returns: list of (distName, params, aic) sorted by AIC """ data = np.asarray(data, dtype=float) if distributions is None: distributions = ['normal', 'lognormal', 'exponential', 'gamma'] results = [] for distName in distributions: try: if distName == 'normal': mu = np.mean(data) sigma = np.std(data, ddof=1) logLik = -0.5 * len(data) * np.log(2 * np.pi * sigma**2) - np.sum((data - mu)**2) / (2 * sigma**2) aic = 2 * 2 - 2 * logLik results.append((distName, {'mu': mu, 'sigma': sigma}, aic)) elif distName == 'lognormal': positiveData = data[data > 0] if len(positiveData) > 0: logData = np.log(positiveData) mu = np.mean(logData) sigma = np.std(logData, ddof=1) logLik = -0.5 * len(positiveData) * np.log(2 * np.pi * sigma**2) - np.sum((logData - mu)**2) / (2 * sigma**2) - np.sum(np.log(positiveData)) aic = 2 * 2 - 2 * logLik results.append((distName, {'mu': mu, 'sigma': sigma}, aic)) elif distName == 'exponential': positiveData = data[data > 0] if len(positiveData) > 0: lambdaParam = 1.0 / np.mean(positiveData) logLik = len(positiveData) * np.log(lambdaParam) - lambdaParam * np.sum(positiveData) aic = 2 * 1 - 2 * logLik results.append((distName, {'lambda': lambdaParam}, aic)) elif distName == 'gamma': positiveData = data[data > 0] if len(positiveData) > 0: meanData = np.mean(positiveData) varData = np.var(positiveData, ddof=1) alpha = meanData**2 / varData beta = meanData / varData results.append((distName, {'alpha': alpha, 'beta': beta}, 0)) except Exception: pass results.sort(key=lambda x: x[2]) return results
[docs] def aic(logLikelihood, nParams): """ Akaike Information Criterion. Parameters: logLikelihood: log-likelihood value nParams: number of parameters Returns: AIC value """ return 2 * nParams - 2 * logLikelihood
[docs] def bic(logLikelihood, nParams, nObs): """ Bayesian Information Criterion. Parameters: logLikelihood: log-likelihood value nParams: number of parameters nObs: number of observations Returns: BIC value """ return np.log(nObs) * nParams - 2 * logLikelihood