[WITH CODE] Risk Engine: Computing optimal targets
When RANSAC and Conformal Prediction converge to estimate how much and how far to lose money
Table of contents:
Introduction.
Local extrema detection.
Initialization.
Defining the trend.
Incremental processing.
Calculations between extrema.
Robust model estimation.
Uncertainty quantification via split Conformal Prediction.
Before you begin, remember that you have an index with the newsletter content organized by clicking on “Read full story” in this image.
Introduction
Risk management isn’t about predicting the future—it’s about preparing for it. Markets are riddled with traps: sudden reversals, hidden volatility, and false signals that lure even seasoned traders into costly mistakes. This technique wasn’t built to chase profits; it was crafted to survive.
Imagine you’re navigating a mountain pass in the dark. One misstep—a missed peak, an unseen valley—and disaster strikes. Financial markets are no gentler. The algorithm we’ll unpack here acts as your headlamp, scanning every twist and turn in the data to flag inflection points where risk spikes or recedes. It answers the critical question: When does a trend turn from ally to enemy?
Traditional risk models often lag, reacting only after the damage is done. Ours is different. By hunting for local extrema—those fleeting moments where prices pivot—it spots fractures in trends before they cascade into crises. Think of it as a seismograph for financial tremors: subtle shifts that hint at coming quakes.
By quantifying the magnitude of market swings and wrapping predictions in statistically sound uncertainty bands, it transforms raw data into a playbook for resilience. Whether guarding a portfolio against crashes or timing exits before a bubble bursts, these tools turn volatility from a threat into a mapped minefield.
Local extrema detection
Before delving into our risk engine, we need to set the stage with some fundamental concepts. We denote our time series by a function
where each index i corresponds to a sample s(i). Think of s(i) as the price or rate at time i. The overall goal is to detect local extrema—local minima or “bottoms” and local maxima or “peaks”—by monitoring changes in the direction of s(i) as we move sequentially through the data. The functions that follow implement this by tracking the trend and then, upon detecting a reversal, marking the previously recorded candidate as an extremum.
You can expect something like:
Let's see how it works.
Initialization
So the first step is to extract meaningful signals from our time series by identifying turning points—peaks and bottoms—that often correspond to important market events. These extrema help us understand when the market might be poised for a reversal.
Our algorithm initializes by setting the candidate index c=0 and candidate value s(c)=s(0). Additionally, we define a trend variable τ∈{−1,0,1}, where:
τ=1 indicates that the series is currently increasing,
τ=−1 indicates that the series is decreasing,
τ=0 means the trend has not yet been established.
Simultaneously, we construct an output signal array extrema(i) such that:
This signal array acts as our signal detector, flagging potential turning points in the market.
Defining the trend
Once the algorithm begins iterating from i=1, it tries to establish a trend. If the trend is undefined (τ=0), it computes:
Here, the sign function sgn(x) tells us whether our time series is initially rising or falling. This is starting to get interesting.
Incremental processing
Once a trend is defined, the algorithm processes each new data point in one of two ways:
Case A - Increasing trend (τ=1):
If
\(s(i) \ge s(c),\)the series continues to rise. Hence, the candidate is updated:
\(c \leftarrow i \quad \text{and} \quad s(c) \leftarrow s(i).\)If, however, s(i)<s(c), then a reversal occurs. The candidate c is deemed a local maximum or peak and is flagged as:
\(\text{extrema}(c) = -1.\)The trend is then switched to decreasing (τ←−1) and the candidate is reset:
\(c \leftarrow i, \quad s(c) \leftarrow s(i).\)Case B - Decreasing trend (τ=−1):
If
\(s(i) \le s(c),\)the series continues to fall, so we update:
\(c \leftarrow i \quad \text{and} \quad s(c) \leftarrow s(i).\)When s(i)>s(c), the candidate is marked as a local minimum (bottom):
\(\text{extrema}(c) = 1.\)Then, the trend is switched to increasing (τ←1) and the candidate is reset:
\(c \leftarrow i, \quad s(c) \leftarrow s(i).\)
This incremental procedure effectively implements the following idea:
A local maximum is detected at index c if there exists a neighborhood around ccc such that
\(s(c) \ge s(j) \quad \text{for } j \text{ in some neighborhood of } c,\)with the series rising before ccc and falling after ccc.
Similarly, a local minimum is detected at ccc if
\(s(c) \le s(j) \quad \text{for } j \text{ in a neighborhood of } c,\)with the series falling before and rising after c.
A key aspect of this method is its inherent delay of one sample. The algorithm needs an extra data point to confirm that the trend has reversed. So don't even think about using it as a method to enter the market. Worst case, the signal will be late and the trade won't be opened.
Time to check how to implement that:
import numpy as np
def find_extrema(series):
"""
Detect local extrema (peaks and bottoms) in a 1D time series.
Given a one-dimensional array `series`, this function returns an array of the same length,
where:
- 1 indicates a local minimum (bottom),
- -1 indicates a local maximum (peak),
- 0 indicates no local extremum.
The function processes the series incrementally. When a new data point confirms a change in trend,
the candidate extremum is flagged (e.g., a transition from increasing to decreasing marks the candidate
as a local maximum).
Note:
Confirming an extremum requires the following data point, resulting in a minimum delay of one sample
in signal detection, which is inherent to the definition of local extrema.
Parameters:
series (array_like): A one-dimensional array of numerical values representing the time series.
Returns:
numpy.ndarray: An integer array of the same length as `series` containing:
1 for local minimum, -1 for local maximum, and 0 for non-extremum points.
"""
series = np.asarray(series)
n = series.size
# Output array for signals
extrema = np.zeros(n, dtype=int)
# Incremental state:
# trend: 1 = increasing trend, -1 = decreasing trend, 0 = undefined yet.
trend = 0
# Candidate for extremum: index and value (updated while the trend is maintained)
candidate_index = 0
candidate_value = series[0]
# Process data starting from the second element
for i in range(1, n):
current_value = series[i]
# If the trend is not yet defined, attempt to define it:
if trend == 0:
if current_value > candidate_value:
trend = 1
candidate_index = i
candidate_value = current_value
elif current_value < candidate_value:
trend = -1
candidate_index = i
candidate_value = current_value
# If values are equal, trend remains undefined and candidate is retained.
continue
# Process based on the current trend:
if trend == 1: # Increasing trend
if current_value >= candidate_value:
# Trend continues; update candidate for potential maximum.
candidate_index = i
candidate_value = current_value
else:
# Trend inversion: from increasing to decreasing.
# Candidate was the highest point, so mark it as a local maximum.
extrema[candidate_index] = -1
# Update trend to decreasing
trend = -1
# Reset candidate using the current data point
candidate_index = i
candidate_value = current_value
elif trend == -1: # Decreasing trend
if current_value <= candidate_value:
# Trend continues; update candidate for potential minimum.
candidate_index = i
candidate_value = current_value
else:
# Trend inversion: from decreasing to increasing.
# Candidate was the lowest point, so mark it as a local minimum.
extrema[candidate_index] = 1
trend = 1
candidate_index = i
candidate_value = current_value
# The last candidate (final sample) cannot be confirmed without a subsequent data point,
# so it remains unflagged.
return extremaCalculations between extrema
Once local extrema are detected, our next task is to quantify the changes between these turning points. These differences help us capture the magnitude of market moves—this is key for our risk management setup.
Let
denote the signal at each time index. We extract indices i1,i2,…,im where σ(ik)≠0. Then we define:
If a local minimum—where σ(ik)=1—is immediately followed by a local maximum—where σ(ik+1)=−1—he difference is:
\(\Delta_{\text{min}\to\text{max}} = s(i_{k+1}) - s(i_k).\)This value represents an upward move, typically interpreted as a buy signal.
Conversely, if a local maximum—where σ(ik)=−1—is followed by a local minimum—where σ(ik+1)=1—then:
\(\Delta_{\text{max}\to\text{min}} = s(i_{k+1}) - s(i_k),\)often yielding a negative number and suggesting a sell signal.
Basically, if
we compute:
and classify it based on the type of transition.
In addition to pairwise differences, we are interested in the net change over segments of consecutive nonzero signals. Consider a segment starting at index i0 with a given signal σ(i0). As long as the signal remains the same, the base value is s(i0). When the signal changes at index i1 we calculate:
This procedure allows us to capture the aggregate change during intervals when the market exhibits a consistent trend, thus providing a cumulative measure of risk exposure.
To implement this we need to code it like:
def calc_diffs_by_signal(series, signals):
"""
Calculate differences between consecutive local extrema in a time series.
Given a 1D array `series` and an array of signals `signals` (output from the `find_extrema` function),
this function iterates through the signals to calculate the differences between consecutive extrema:
- positive_diffs: The difference between the value at a local maximum (-1 signal) and the value at the
preceding local minimum (1 signal), representing a move from a bottom to a peak.
- negative_diffs: The difference between the value at a local minimum (1 signal) and the value at the
preceding local maximum (-1 signal), representing a move from a peak to a bottom.
Parameters:
series (array_like): A one-dimensional array of numerical values representing the time series.
signals (array_like): A one-dimensional array of signals indicating local extrema
(1 for local minimum, -1 for local maximum, 0 otherwise).
Returns:
tuple: A tuple (positive_diffs, negative_diffs), where:
- positive_diffs is a list of differences for transitions from local minimum to maximum.
- negative_diffs is a list of differences for transitions from local maximum to minimum.
"""
positive_diffs = []
negative_diffs = []
# Variables to store the previous confirmed extremum's signal and index.
prev_signal = None
prev_index = None
for i, signal in enumerate(signals):
if signal != 0:
# If it's the first detected extremum, store it.
if prev_signal is None:
prev_signal = signal
prev_index = i
else:
# Calculate difference only if the signal has changed.
if signal != prev_signal:
# Transition from local minimum (1) to local maximum (-1)
if prev_signal == 1 and signal == -1:
difference = series[i] - series[prev_index]
positive_diffs.append(difference)
# Transition from local maximum (-1) to local minimum (1)
elif prev_signal == -1 and signal == 1:
difference = series[i] - series[prev_index]
negative_diffs.append(difference)
# Update the stored signal and index.
prev_signal = signal
prev_index = i
else:
# If the current signal is the same as the previous one, update the index
# (this may occur if multiple consecutive extrema of the same type are detected).
prev_index = i
return positive_diffs, negative_diffs
def calc_differences(series, signals):
"""
Compute the difference between the current value and the initial value of each non-zero signal segment.
Given a one-dimensional array `series` (for example, opening prices) and an array of signals `signals`
(resulting from the `find_extrema` function), this function iterates over the signals (ignoring zeros)
and calculates the difference between the current value of `series` and the value when the signal started.
Each time the signal changes, the difference is computed and appended to a list.
Returns:
list: A list of all computed differences.
"""
differences = []
# Variables to store the initial signal and its index for the current signal segment.
initial_signal = None
initial_index = None
for i, sig in enumerate(signals):
if sig != 0: # Process only non-zero signals.
if initial_signal is None:
# Store the first non-zero signal and its index.
initial_signal = sig
initial_index = i
else:
if sig != initial_signal:
# Signal change detected: calculate the difference relative to the initial value.
diff = series[i] - series[initial_index]
differences.append(diff)
# Reset the base signal and its index.
initial_signal = sig
initial_index = i
# If the signal is the same, retain the initial index to preserve the base value.
return differencesRobust model estimation
After we have extracted signals and computed differences, the next step is to model the relationship between these differences and time. A robust linear regression is necessary to avoid outliers—those unexpected market shocks that can derail standard estimation techniques.
Here is where RANSAC (RANdom SAmple Consensus) comes into play. It is a method that iteratively selects random subsets to estimate model parameters robustly.
Assume we have training points
where N=len(xtrain) and the linear model is given by:
Here, β0 represents the slope and β1 the intercept. For our purposes, we set m=1—since we have a single explanatory variable—which leads to m+1=2 parameters.
For each iteration in our RANSAC process—with a total of itr iterations):
Randomly select m+1 data points from the training set. Let the indices be i1,i2,…,im.
Construct the design matrix X and response vector Y as follows:
\(X = \begin{pmatrix} x_{i_1} & 1 \\ x_{i_2} & 1 \\ \vdots & \vdots \\ x_{i_{m+1}} & 1 \end{pmatrix}, \quad Y = \begin{pmatrix} y_{i_1} \\ y_{i_2} \\ \vdots \\ y_{i_{m+1}} \end{pmatrix}.\)Solve the equation
\(X {\beta} \approx Y,\)by minimizing the squared error:
\({\beta} = \arg\min_{{\beta}} \|X {\beta} - Y\|_2^2.\)This yields a candidate model with parameters
\({\beta} = [\beta_0, \beta_1]^\top\)
Once the candidate model is computed, we evaluate it against the entire training dataset. For each data point, we compute the predicted value:
and the corresponding squared error:
We then calculate the mean error and standard deviation:
A point is considered an inlier if:
where limit is a user-defined threshold. In mathematical terms, let
with 1{⋅} being the indicator function. The candidate model with the highest inlier count—or one that meets the threshold τ⋅N—is selected as the best model.
This robust approach allows us to mitigate the effect of outliers or inconsistencies, ensuring that the estimated model reflects the underlying trend rather than being skewed by anomalous data points.
Time for implementing this part:
def RANSAC_regression(x_train, y_train, itr, limit, tau, m):
"""
Perform robust linear regression using the RANSAC algorithm.
This function fits a linear model to the provided training data by iteratively selecting random subsets
of data points and evaluating candidate models based on the number of inliers determined using a
Chebyshev inequality-based criterion.
Parameters:
x_train (array_like): 1D array of independent variable values for training.
y_train (array_like): 1D array of dependent variable values for training.
itr (int): Number of iterations to run the RANSAC algorithm.
limit (float): Threshold used in the Chebyshev inequality criterion to determine inliers.
tau (float): Minimum fraction of total data points that must be inliers for a model to be considered acceptable.
m (int): Number of unknowns in the model. For linear regression (slope and intercept), use m = 1
(resulting in m+1 parameters).
Returns:
tuple: A tuple containing:
- best_model (numpy.ndarray): Best-fit model parameters as a 1D array (e.g., [slope, intercept]).
- max_counter (int): The number of inliers for the best model.
Notes:
Candidate models are generated by randomly selecting (m+1) points and solving a least squares problem.
The model with the highest number of inliers is chosen as the best model.
"""
mode = tau * len(y_train)
max_counter = 0
best_model = None
s = len(x_train)
for itr_idx in range(itr):
# Randomly select m+1 points; for a line, m+1 equals 2 points.
indices = np.random.choice(s, m+1, replace=False)
# Build the design matrix and response vector.
X = np.zeros((m+1, m+1))
Y = np.zeros((m+1, 1))
for j in range(m+1):
X[j, :] = np.array([x_train[indices[j]], 1])
Y[j, 0] = y_train[indices[j]]
# Solve for model parameters using least squares.
try:
model, residuals, rank, svals = np.linalg.lstsq(X, Y, rcond=None)
except Exception:
continue
model = model.flatten() # model is [slope, intercept]
# Compute predictions for the entire training set.
y_pred_train = model[0] * x_train + model[1]
error = (y_train - y_pred_train) ** 2
mu = error.mean()
std = error.std()
# Count inliers using a Chebyshev-based criterion.
counter = np.sum(np.abs((error - mu) / std) < (limit / mu))
if counter > max_counter:
max_counter = counter
best_model = model
if counter >= mode:
break
if best_model is None:
best_model = np.zeros(m+1)
return best_model, max_counterUncertainty quantification via split Conformal Prediction
No model is complete without a measure of its uncertainty. In risk management, knowing the reliability of your prediction is crucial. Here, we enhance our robust regression with uncertainty quantification using split conformal prediction—a method that produces prediction intervals with statistical guarantees.
First, we randomly split it into a training set and a calibration set. Let:
The training set is used to fit the robust model, while the calibration set will help us gauge how well the model performs.
\(n_{\text{cal}} = \lfloor \text{cal\_frac} \times n \rfloor, \quad n_{\text{train}} = n - n_{\text{cal}}.\)After fitting the model on the training set, predictions on the calibration set are given by:
\(\hat{y}_i^{\text{cal}} = \beta_0 x_i^{\text{cal}} + \beta_1, \quad i = 1,\dots,n_{\text{cal}},\)and the residuals are:
\(r_i = y_i^{\text{cal}} - \hat{y}_i^{\text{cal}}.\)A robust estimate of the scale of errors is obtained using the median absolute deviation:
\(\text{scale} = \text{median}\left\{|r_i| : i=1,\dots,n_{\text{cal}}\right\} + \epsilon,\)where ϵ is a small constant to avoid division by zero. This step is critical for adjusting for heteroscedasticity—i.e., non-constant variance—in the error terms.
We then compute normalized nonconformity scores:
\(s_i = \frac{|r_i|}{\text{scale}}, \quad i=1,\dots,n_{\text{cal}}.\)Sorting these scores yields:
\(s_{(1)} \le s_{(2)} \le \cdots \le s_{(n_{\text{cal}})}.\)We choose the quantile index:
\(k = \lceil (n_{\text{cal}}+1)(1-\alpha) \rceil - 1,\)so that the normalized conformal quantile is:
\(Q_{\text{norm}} = s_{(k)}.\)Rescaling gives the final nonconformity threshold:
\(Q = Q_{\text{norm}} \times \text{scale}.\)For a new data point the model predicts:
\(\hat{y}_{\text{last}} = \beta_0 \, x_{\text{last}} + \beta_1.\)The conformal prediction interval is then defined as:
\(\left[\hat{y}_{\text{last}} - Q, \; \hat{y}_{\text{last}} + Q\right].\)This interval is guaranteed, under mild assumptions, to contain the true response with probability approximately 1−α.
Okay! How to do all of that?






