import numpy as np
def _minimize(func, x0, bounds=None, maxIter=1000):
"""Simple gradient-free optimization using pattern search."""
x = np.array(x0, dtype=float)
n = len(x)
if bounds is None:
bounds = [(-np.inf, np.inf)] * n
stepSize = 0.1
minStep = 1e-8
bestF = func(x)
for iteration in range(maxIter):
improved = False
for i in range(n):
xTry = x.copy()
xTry[i] += stepSize
xTry[i] = np.clip(xTry[i], bounds[i][0], bounds[i][1])
fTry = func(xTry)
if fTry < bestF:
x = xTry
bestF = fTry
improved = True
continue
xTry = x.copy()
xTry[i] -= stepSize
xTry[i] = np.clip(xTry[i], bounds[i][0], bounds[i][1])
fTry = func(xTry)
if fTry < bestF:
x = xTry
bestF = fTry
improved = True
if not improved:
stepSize *= 0.5
if stepSize < minStep:
break
return x
[docs]
def blackLitterman(covMatrix, pi, P, Q, tau=0.05):
"""
Black-Litterman model for portfolio optimization.
Parameters:
covMatrix: covariance matrix of returns
pi: equilibrium (market-implied) returns
P: views matrix (rows=views, cols=assets)
Q: view returns vector
tau: prior uncertainty (default 0.05)
Returns:
adjusted expected returns
"""
tauCov = tau * covMatrix
PtauCovPt = P @ tauCov @ P.T
PtauCovPtInv = np.linalg.inv(PtauCovPt)
tauCovInv = np.linalg.inv(tauCov)
MInv = np.linalg.inv(tauCovInv + P.T @ PtauCovPtInv @ P)
adjustedReturns = MInv @ (tauCovInv @ pi + P.T @ PtauCovPtInv @ Q)
return adjustedReturns
[docs]
def meanVariance(expectedReturns, covMatrix, riskAversion=0.5):
"""
Mean-variance optimization.
Parameters:
expectedReturns: expected returns for each asset
covMatrix: covariance matrix
riskAversion: risk aversion parameter (higher = more risk averse)
Returns:
optimal portfolio weights
"""
nAssets = len(expectedReturns)
def objective(weights):
portReturn = weights @ expectedReturns
portVol = np.sqrt(weights @ covMatrix @ weights)
return -portReturn / portVol
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(objective, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def minVariance(covMatrix):
"""
Minimum variance portfolio.
Parameters:
covMatrix: covariance matrix
Returns:
minimum variance portfolio weights
"""
nAssets = len(covMatrix)
def objective(weights):
return weights @ covMatrix @ weights
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(objective, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def riskParity(covMatrix):
"""
Risk parity portfolio optimization.
Parameters:
covMatrix: covariance matrix
Returns:
risk parity portfolio weights
"""
nAssets = len(covMatrix)
def objective(weights):
portVar = weights @ covMatrix @ weights
marginalRisks = covMatrix @ weights
riskContribs = weights * marginalRisks / (portVar + 1e-10)
target = 1.0 / nAssets
return np.sum((riskContribs - target)**2)
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(objective, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def equalWeight(nAssets):
"""
Equal weight portfolio.
Parameters:
nAssets: number of assets
Returns:
equal weight portfolio weights
"""
return np.ones(nAssets) / nAssets
[docs]
def maxDiversification(covMatrix):
"""
Maximum diversification portfolio.
Parameters:
covMatrix: covariance matrix
Returns:
maximum diversification portfolio weights
"""
nAssets = len(covMatrix)
volatilities = np.sqrt(np.diag(covMatrix))
def objective(weights):
weightedVol = weights @ volatilities
portVol = np.sqrt(weights @ covMatrix @ weights)
return -weightedVol / (portVol + 1e-10)
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(objective, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def tangency(expectedReturns, covMatrix, riskFreeRate=0):
"""
Tangency portfolio (maximum Sharpe ratio).
Parameters:
expectedReturns: expected returns
covMatrix: covariance matrix
riskFreeRate: risk-free rate
Returns:
tangency portfolio weights
"""
nAssets = len(expectedReturns)
excessReturns = expectedReturns - riskFreeRate
def objective(weights):
portReturn = weights @ excessReturns
portVol = np.sqrt(weights @ covMatrix @ weights)
return -portReturn / (portVol + 1e-10)
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(objective, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def maxSharpe(expectedReturns, covMatrix, riskFreeRate=0):
"""
Maximum Sharpe ratio portfolio (alias for tangency).
Parameters:
expectedReturns: expected returns
covMatrix: covariance matrix
riskFreeRate: risk-free rate
Returns:
maximum Sharpe portfolio weights
"""
return tangency(expectedReturns, covMatrix, riskFreeRate)
[docs]
def efficientFrontier(expectedReturns, covMatrix, nPoints=50):
"""
Compute efficient frontier.
Parameters:
expectedReturns: expected returns
covMatrix: covariance matrix
nPoints: number of points on frontier
Returns:
list of dicts with returns, volatility, weights
"""
minVar = minVariance(covMatrix)
minRet = minVar @ expectedReturns
maxRet = np.max(expectedReturns)
targetReturns = np.linspace(minRet, maxRet, nPoints)
frontier = []
for targetRet in targetReturns:
nAssets = len(expectedReturns)
def objective(weights):
return weights @ covMatrix @ weights
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = initialWeights.copy()
bestVar = objective(bestWeights)
for _ in range(500):
testWeights = np.random.dirichlet(np.ones(nAssets))
testRet = testWeights @ expectedReturns
if abs(testRet - targetRet) < 0.01:
testVar = objective(testWeights)
if testVar < bestVar:
bestWeights = testWeights
bestVar = testVar
weights = bestWeights
portRet = weights @ expectedReturns
portVol = np.sqrt(weights @ covMatrix @ weights)
frontier.append({
'return': portRet,
'volatility': portVol,
'weights': weights
})
return frontier
[docs]
def hierarchicalRiskParity(covMatrix, returns):
"""
Hierarchical risk parity portfolio allocation.
Parameters:
covMatrix: covariance matrix
returns: historical returns (for correlation)
Returns:
HRP portfolio weights
"""
nAssets = len(covMatrix)
corr = covMatrix / np.outer(np.sqrt(np.diag(covMatrix)), np.sqrt(np.diag(covMatrix)))
dist = np.sqrt((1 - corr) / 2)
clusters = [[i] for i in range(nAssets)]
while len(clusters) > 1:
minDist = float('inf')
mergeI, mergeJ = 0, 1
for i in range(len(clusters)):
for j in range(i + 1, len(clusters)):
clusterDist = 0
count = 0
for a in clusters[i]:
for b in clusters[j]:
clusterDist += dist[a, b]
count += 1
avgDist = clusterDist / count
if avgDist < minDist:
minDist = avgDist
mergeI, mergeJ = i, j
clusters[mergeI].extend(clusters[mergeJ])
del clusters[mergeJ]
def allocate(cluster):
if len(cluster) == 1:
return {cluster[0]: 1.0}
mid = len(cluster) // 2
left = cluster[:mid]
right = cluster[mid:]
leftVar = covMatrix[np.ix_(left, left)].sum()
rightVar = covMatrix[np.ix_(right, right)].sum()
totalVar = leftVar + rightVar
leftWeight = 1.0 - leftVar / totalVar
rightWeight = 1.0 - rightVar / totalVar
leftAlloc = allocate(left)
rightAlloc = allocate(right)
allocation = {}
for asset, weight in leftAlloc.items():
allocation[asset] = weight * leftWeight
for asset, weight in rightAlloc.items():
allocation[asset] = weight * rightWeight
return allocation
allocation = allocate(clusters[0])
weights = np.zeros(nAssets)
for asset, weight in allocation.items():
weights[asset] = weight
weights = weights / np.sum(weights)
return weights
[docs]
def minCvar(expectedReturns, returns, alpha=0.95):
"""
Minimum CVaR (Conditional Value at Risk) portfolio.
Parameters:
expectedReturns: expected returns
returns: historical returns matrix (nSamples x nAssets)
alpha: confidence level
Returns:
minimum CVaR portfolio weights
"""
nAssets = returns.shape[1]
def cvar(weights):
portReturns = returns @ weights
var = np.percentile(portReturns, (1 - alpha) * 100)
tailLosses = portReturns[portReturns <= var]
return -np.mean(tailLosses) if len(tailLosses) > 0 else 0
initialWeights = np.ones(nAssets) / nAssets
bounds = [(0, 1) for _ in range(nAssets)]
bestWeights = _minimize(cvar, initialWeights, bounds=bounds)
bestWeights = bestWeights / np.sum(bestWeights)
return bestWeights
[docs]
def maxReturn(expectedReturns):
"""
Maximum return portfolio (100% in highest expected return asset).
Parameters:
expectedReturns: expected returns
Returns:
maximum return portfolio weights
"""
weights = np.zeros(len(expectedReturns))
weights[np.argmax(expectedReturns)] = 1.0
return weights