Table of contents:
Introduction.
The API layer.
Protocol semantics, transport choice, and system state.
Risk and financial APIs limitations.
Historical-live non-equivalence and data reconstruction error.
Execution state machines and order routing.
Simulation and live trading.
Creating the architecture for a trading API.
Before you begin, remember that you have an index with the newsletter content organized by clicking on “Read the newsletter index” in this image.
Introduction
Before moving on to the next series, there’s an important point that any algorithmic trader needs to know: the point about APIs.
A quant can state the issue in simple terms. Let st denote latent market state and let xt denote observed state at decision time. The strategy trades on xt rather than st. The transformation from st to xt includes vendor capture, transport, buffering, schema validation, clock alignment, and local state reconstruction. When that transformation changes, the strategy changes with it, even if the signal formula stays fixed.
The API layer belongs to the hypothesis itself because it determines the admissible information set on which the strategy acts.
This point matters because the gap between research and deployment tends to emerge at the boundary where information becomes software state. Bars in a notebook arrive sorted and complete. Live messages arrive through sessions, quotes, reconnect paths, and transport rules. Orders in a backtest pass from signal to fill in one step. Orders in production pass through acceptance, routing, queue interaction, etc. In a systematic process, those choices carry mathematical weight because they alter the state space on which the strategy operates.
The live strategy therefore admits a fuller description than the research signal alone. A useful expression is
Many teams place their energy on features, models, and optimization while the interface layer receives less formal treatment. That habit carries a cost. A signal derived from one information regime and deployed through another regime becomes a different object. A simulator that treats execution as an atomic event studies a market process distinct from the one live code encounters.
The API layer is where these fractures surface. It is also where discipline can restore coherence. If you want a stack to start with, here it is:
An API is an information contract, a timing contract, and an execution contract. That’s to say, data fields need causal meaning, timestamps need clear authority, order states need legal transitions, internal services need shared schemas across research, simulation, and live trading. Do you get the ida right? Under those conditions, performance claims acquire stronger footing because the path from observation to action remains explicit.
The subject here is the form through which market enters the strategy and the form through which strategy intent reaches the market. Code style, framework choice, and endpoint count matter through their effect on that form. A live trading stack earns coherence when the objects that move across it stay explicit, typed, and stable across the full path from research to execution. That coherence is a design choice and it begins at the API layer.
The API layer
In a trading system, the central question asks what is known, when it becomes known, and which representation carries it. The API layer lives at that boundary. It defines the admissible information set available to the strategy at decision time. That framing captures the practical source of failure with precision.
Let the latent market state at event index t be st. The strategy never accesses that state in raw form. It receives an encoded view xt, produced through venue generation, vendor capture, normalization, transport, buffering, decoding, and local deserialization. If those operations are compressed into a single operator H, then the strategy acts on
In live systems, H belongs to a family of transformations shaped by endpoint choice, session state, codec, throttling behavior, aggregation policy, and recovery logic. The research question therefore asks two things at once: whether a mapping f(xt) carries predictive value, and whether the xt used in research belongs to the same family as the xt available in deployment.
A useful decomposition is
The term Ht(s0:t) shows that observed state may depend on a path rather than a single contemporaneous latent state. Corrections, delayed packets, local aggregation, and dropped messages create that path dependence. The residual term εt collects the user-facing uncertainty generated by packet loss, schema surprises, decoder failure, clock mismatch, and hidden vendor behavior. In a calm hourly-bar strategy, εt may occupy a small role. In a short-horizon or execution-sensitive strategy, it often defines the core challenge.
This perspective dissolves a common misconception. Many quants treat the API as a latency channel and connect latency risk with high-frequency trading. A richer view sees the API as a state-visibility mechanism. Consider a breakout rule triggered when price crosses a level and volume in the same interval exceeds a threshold. One endpoint may publish aggregated bars with finalized volume, while another may publish incremental trades that require local accumulation. The rule then sees different objects.
Speed alone does not resolve that difference. One representation is final. The other is provisional. One is bounded by bar closure. The other evolves through the interval. If research evaluates the final object while deployment trades the provisional one, then backtest and live trading inhabit different information worlds.
The official protocol and framework documentation supports this view from the software side. HTTP semantics are defined as stateless at the application-protocol level. That property makes HTTP well suited for resource retrieval, explicit commands, and idempotent inspection, while state continuity moves into higher layers. The OpenAPI specification makes HTTP service capabilities machine-readable through paths, parameters, schemas, and responses. FastAPI builds on that structure by generating OpenAPI descriptions and interactive documentation from Python declarations. Pydantic treats typed data models as executable contracts and emits JSON Schema from those models. Finance gives special weight to this alignment because state integrity sits at the center of the problem.
For a systematic trading system, the internal data model becomes a compact statement of admissible knowledge. A well-defined Bar object can enforce monotonic timestamps, nonnegative volume, high-low consistency, a declared timezone, and provenance metadata. A Signal object can enforce a causal timestamp, a horizon, a generation identifier, and a confidence field interpreted consistently by the risk layer. An OrderIntent object can separate intent from broker acknowledgment, so the system preserves a clean distinction between desired action, accepted action, and executed action. These structures carry operational value because they keep portfolio state, execution state, and research state aligned across time.
Protocol semantics, transport choice, and system state
A familiar hierarchy pairs REST with slower interactions, WebSocket with streaming interactions, and FIX with institutional workflows. A stronger formulation sees each protocol as an organizer of interaction and, through that role, as a designer of local architecture.
Start with HTTP. Under RFC 9110, HTTP defines resource-oriented request-response semantics, methods, status codes, headers, and representations. It serves clear operations with strong structure: fetching historical bars, querying account state, placing an order, canceling an order, requesting configuration, and obtaining a health report. It also supports documentation with high clarity.
For internal trading services, that structure has high value. The stack gains clear contracts, reproducible input schemas, and rapid client generation.
HTTP also channels interaction through discrete exchanges. Applications that consume a continuous stream of trade events, order updates, heartbeats, or PnL deltas benefit from WebSocket. RFC 6455 defines a bidirectional protocol over TCP with an opening handshake and framed messages, built for two-way communication.
For example, Alpaca’s streaming documentation—I mean, the broker—presents trade, account, and order updates over WebSocket and highlights frame types across endpoints, plus authorization and stream subscription after connection. For a quant system, the implication is clear: WebSocket provides continuity, and continuity assigns responsibility. A persistent connection requires reconnect handling, duplicate control, interval recovery, subscription replay, sequence validation, and local buffering.
{
"action": "auth",
"key": "{YOUR_API_KEY_ID}",
"secret": "{YOUR_API_SECRET_KEY}"
}FIX addresses a different problem. The FIX Trading Community describes FIX as:
A standardized transaction language across the securities trade cycle, with message types aligned to steps such as quote request and new order, and with standardized fields as the building blocks.
That matters because institutional trading depends on precise business semantics across a workflow that spans indication, order, execution report, allocation, and post-trade reporting. FIX draws strength from a protocol that already speaks the language of the transaction. A modern retail or semi-professional quant stack may begin on other connectivity paths, yet it can still absorb the lesson: trading systems gain stability when message meanings stay explicit, durable, and tied to the lifecycle they govern.
The mathematics of transport stays simple. Let Ttotal denote end-to-end delay from the moment a signal becomes ready to the moment a fill event reaches local state. Then
A robust analysis evaluates the whole sum. In many implementations, Tbroker and Treconcile generate more variation than raw network travel time. Serialization, authentication, request signing, broker throttling, internal queuing, and local reconciliation can dominate the tails. Tail behavior matters more than averages because losses tend to concentrate there. A mean delay of 20 ms with a 95th percentile of 60 ms may fit one strategy and damage another whose edge concentrates in short-lived bursts.
The histogram decomposes end-to-end order latency into plausible components and shows the aggregate distribution. The plot reveals that total latency is a sum of heterogeneous terms and that the tail can widen even when most components remain well behaved.
Risk and financial APIs limitation
Once the API is treated as a formal component, the next task is classification. Risk in financial APIs represents a family of failure modes that interact. A useful taxonomy separates at least seven categories: data risk, temporal risk, schema risk, session risk, throughput risk, execution risk, and governance risk.
Data risk concerns what is transmitted. This includes missing fields, inconsistent bar construction, delayed corrections, undocumented transformations, and ambiguity about what a timestamp refers to. Is the bar timestamp the interval open, the interval close, or publication time? Are trades condition-filtered? Are odd lots included? Is volume consolidated at the same cadence in historical and live endpoints? Vendor documentation sometimes answers these questions, sometimes not. The point is that the risk exists even when the feed looks clean. A clean feed can still describe the wrong object for the strategy.
Temporal risk concerns when the data is available and in which order. Clock drift between client, broker, and data vendor is the obvious part. More subtle is the distinction between event time and arrival time. A message may encode one time but arrive at another. Streaming systems can also deliver temporary disorder. If the strategy assumes a total order while the transport offers only a best-effort order plus reconnect recovery, then the local state machine must restore order or at least detect that order was broken.
Schema risk concerns how the data is structured. An endpoint that changes a field name, adds a nullable attribute, changes a numeric type from integer to string, or alters nesting may be perfectly legal at the API level and still break live trading. This is why type validation libraries matter.
Session risk is operational but critical. Authentication expiry, refresh tokens, signed requests, environment separation, and broker session exclusivity determine whether the trading path remains live. Some APIs are quite clear that trading-enabled sessions have different restrictions than generic access, including brokerage-level single-session limits.
Throughput risk includes rate limits, quotas, pagination, backpressure, and resource exhaustion. This category is often underestimated by quants who work first in notebooks. Yet the feasible research surface is shaped by what the API allows. If a cross-sectional study requires more requests per minute than the provider permits, then the signal either needs a different collection design, a local cache, a licensed feed, or a narrower universe. Throughput is a constraint on the set of admissible strategies.
Execution risk concerns the gap between request acceptance and actual market interaction. Here a good example of why this matters: “the order object has identifiers, statuses, and a lifecycle that can be queried after placement. That is useful, but it also confirms the obvious point that the initial placement call is only one transition in a larger process. SEC routing and execution disclosures exist because the path from broker to venue materially affects outcomes”. In trading, the request path and the execution path should never be conflated.
Governance risk covers versioning, deprecation, environment drift, and organizational ambiguity. An internal API that is undocumented, lacks version tags, or mixes research-only fields with live-only fields can corrupt an otherwise sound strategy. Governance is what keeps the contract stable long enough for experiments to be comparable.
A convenient formalization is to model observed system risk as the union of these categories:
Each component can be associated with observables: gap counts, duplicate counts, validation failures, authentication renewals, request rejections, order rejects, and contract-version mismatches. The important point is that API risk becomes measurable only after categories are explicit.
The table below compresses this pitfalls:
Throughput deserves one extra comment because it changes research conclusions. Assume that trade opportunities survive for a characteristic time constant τ and that the system polls every Δ units of time rather than consuming a push stream. A rough first-order miss model is:
This formula is for illustrative purposes. It says that when opportunity duration is short relative to the observation interval, miss probability rises quickly. Coarser polling changes the distribution of observed opportunities. The picture below visualizes that relationship:
Quants often frame API problems as reliability problems. They are also selection problems. The feed, the protocol, and the quota choose which events you ever get the chance to model. That means a strategy discovered under one data-access regime may not even be defined under another. Once this is understood, the response becomes sharper.
Historical-live non-equivalence and data reconstruction error
Historical endpoints and live endpoints are seldom equivalent in the strict sense required by causal research. They may deliver related objects, but related is not enough. Historical data is often cleaned, deduplicated, repaired, aligned to bar boundaries, and packaged into stable records. Live data is incremental, provisional, and susceptible to temporary disorder. A strategy that learns on one and trades on the other inherits a reconstruction problem.
There are at least four sources of non-equivalence:
The first is finalization: Historical bars tend to be final objects. Live bars, unless the provider specifically publishes only completed intervals, may be in progress.
The second is correction policy: Historical vendors can revise prior records after exchange corrections or internal data-quality routines. Live systems receive the pre-correction path and may or may not later receive explicit correction messages.
The third is aggregation method: A historical endpoint may aggregate on server-side rules that differ from a client’s local reconstruction from trade ticks.
The last one is delivery semantics: Historical data is fetched as a completed array. Live data is delivered as a stream in which messages can be delayed, duplicated, or arrive after reconnect.
Any research process should ask a simple but uncomfortable question: is the historical object used in research reproducible from the live stream using only information available at the time? If the answer is no, then the strategy is at risk of hidden lookahead or deployment drift. Sometimes the mismatch is harmless. Often it is not. Opening-range logic, event-triggered strategies, and execution-sensitive systems are especially vulnerable because they depend on exact temporal boundaries.
One clean way to formalize the problem is to define a reconstruction operator G that maps a live stream path into a bar or state record:
The historical dataset supplies bthist. The research assumption that often goes unstated is that bthist = btlive_recon. This is rare. The true object of concern is the reconstruction error:
A strategy that is stable under small δt may survive deployment. A strategy that changes sign, entry timing, or position size under small δt may lose money even if the backtest looks good. This is one reason why mature groups insist on replay tests. They record live messages, reconstruct the internal state offline, and compare it with both the production decision log and the historical vendor view.
Out-of-order delivery is a particularly sharp example. Suppose messages carry event timestamps ei but arrive at times ai. A client that sorts by arrival time instead of event time assumes that sign(ai - aj) equals sign(ei - ej) for all pairs. That assumption is false in many real networks. Once it breaks, local bars, rolling indicators, or event-triggered conditions may shift. The correct response is to build reordering logic with bounded buffers and explicit policy for what to do when order cannot be restored with confidence.
Here is a minimal sketch of such a guard:
from dataclasses import dataclass
from heapq import heappush, heappop
@dataclass(order=True)
class MarketEvent:
seq: int
ts_event_ns: int
payload: dict
class ReorderBuffer:
def __init__(self, max_gap: int = 32):
self.expected = 0
self.max_gap = max_gap
self.heap = []
def push(self, event: MarketEvent):
if event.seq < self.expected:
return [] # duplicate or stale replay
heappush(self.heap, event)
out = []
while self.heap and self.heap[0].seq == self.expected:
out.append(heappop(self.heap))
self.expected += 1
if self.heap and self.heap[0].seq - self.expected > self.max_gap:
raise RuntimeError("sequence gap too large")
return outThe point of the snippet is to show the shape of the invariant. Local state should advance only when sequence integrity is acceptable. If the gap exceeds a declared tolerance, the client should raise an incident, resubscribe, or rebuild state from a safer source.
The next plot shows the operational consequence of delayed observations on a simple synthetic crossover-style entry. The delayed observation sees the same price path later and enters later. The lesson is that even modest observation delay can transform entry timing in a way that a clean historical backtest never exposed.
A common response is to smooth the signal until these issues disappear. That can help, but it does not solve the identification problem. If the strategy only works when final corrected bars are used, then the strategy depends on a state that may not exist in live trading.
First, define decision-time objects. Second, test historical-live parity by replaying captured live streams into the same internal schemas used in production. Third, store provenance metadata with every derived bar or indicator: source, reconstruction mode, timezone, sequence completeness, and whether the object is provisional or finalized. Fourth, make non-equivalence visible in research by introducing uncertainty envelopes. If a signal flips under plausible reconstruction perturbations, that signal should not be promoted.
Execution state machines and order routing
The industry still tolerates expressions like: “send the order”, “the order is in”, “the order got filled”. For software, these phrases hide critical distinctions. For PnL, they hide money. However, execution should be modeled as a state machine.
A request to place an order is only a statement of intent until the broker acknowledges it. The acknowledgment is not yet a fill or the fill may be partial. The remaining quantity may rest, cancel, expire, or be modified by subsequent logic. Some APIs reflect these transitions in a REST resource that can be queried after placement; others also emit asynchronous trade or order updates over a stream; sometimes users place, monitor, and cancel orders through the Trading API, and that order identifiers and status objects support later inspection. The best approach is to maintain two separate layers: account and order routing. That separation is healthy because it prevents one channel from pretending to be the whole lifecycle.
Okay, let’s talk about the second one: routing. SEC rules on order execution and order routing disclosure exist because the path an order takes after leaving the customer-facing system affects economic results. A retail API client sees only part of the venue-level journey, and the system designer should treat that journey as relevant. Even if the internal API abstracts away venue detail, it should preserve enough state to distinguish user intent, broker acceptance, downstream routing outcome, and final position reconciliation.
A minimal lifecycle can be written as:
with additional branches for rejected, canceled, expired, and replaced. Not every broker surfaces each branch in the same way. The internal system should anyway. Doing so has three advantages. First, it prevents false certainty. Second, it enables reconciliation. Third, it supports simulation. A sound backtest represents the full state machine between signal and filled position, because those intermediate transitions reveal failure modes that materially shape PnL.
Consider expected execution price. A crude but useful decomposition is:
Here Pref is a chosen reference price, Squeue captures queue-position effects and spread interaction, Slatency captures adverse movement while the order is in flight, Simpact captures self-induced price movement for non-negligible size, and Sfees captures commissions, exchange fees, rebates, and financing where relevant. Many prototype systems focus on Slatency and ignore the rest. That is tolerable for a toy simulator, but not for a research workflow that wants to claim tradability. Even a simple simulator should at least model partial fills, expiry, and side-dependent slippage under volatility.
An internal order model should therefore separate three objects:
OrderIntent: what the strategy wants.
BrokerOrderState: what the broker has acknowledged and how it has evolved.
PositionLedgerEntry: what the portfolio believes happened after reconciliation.
Conflating them creates the classic bug where the strategy believes it is flat because its own intent was to exit, while the broker still shows a partially open quantity.
A minimal type sketch might look like this:
from enum import Enum
from pydantic import BaseModel, Field, field_validator
from typing import Optional
class OrderSide(str, Enum):
BUY = "BUY"
SELL = "SELL"
class OrderState(str, Enum):
INTENT = "INTENT"
ACCEPTED = "ACCEPTED"
WORKING = "WORKING"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
REJECTED = "REJECTED"
CANCELED = "CANCELED"
EXPIRED = "EXPIRED"
class OrderIntent(BaseModel):
client_order_id: str
symbol: str
side: OrderSide
qty: float = Field(gt=0)
limit_price: Optional[float] = Field(default=None, gt=0)
class BrokerOrderState(BaseModel):
client_order_id: str
broker_order_id: Optional[str] = None
state: OrderState
filled_qty: float = Field(ge=0)
avg_fill_price: Optional[float] = None
@field_validator("avg_fill_price")
@classmethod
def positive_price_if_present(cls, v):
if v is not None and v <= 0:
raise ValueError("avg_fill_price must be positive")
return vThe exact fields can be adjusted. Once these objects are defined, live logic becomes cleaner. Reconciliation compares the latest broker state with the internal ledger. Simulation advances through legal states instead of jumping from signal to fill. Risk logic defines kill-switch conditions in terms of state divergence, not only PnL.
Quants often focus on price improvement as the hard part of execution. In many systems, the hard part is state certainty. A robust system determines with high confidence whether an order is live, whether a cancel request succeeded, and whether a partial fill changed net exposure. Alpha only matters when the process itself is reliable. The real objective is to build an execution engine whose states are clear enough that later improvements rest on solid assumptions.
This leads to prototype API design. Even at the prototyping stage, order endpoints benefit from idempotency, accept client-generated identifiers, distinguish acceptance from fill, and expose a machine-readable state model. Internal consumers rely on structured state as the source of truth. Human-readable logs still help, but they support the contract rather than define it.
The external side of the problem therefore becomes clear: financial APIs present data and orders as evolving states carried through channels with meaningful semantics. The constructive response is to assign those states formal names and transitions. Once that structure is in place, the internal API becomes the stable surface across which research, simulation, and live code remain aligned.
Simulation and live trading
One script uses the same field names as the next, timezones remain consistent across services, the execution engine works with normalized order states, the backtester uses the same fill object as live trading, and the monitoring service reads position exposure from a shared contract rather than reconstructing it from logs. That is manageable at small scale and essential at medium scale. The solution is to design an internal API.
The main purpose of an internal API is contract preservation. Research, paper trading, backtesting, and live services should consume and emit the same conceptual objects even when their data sources differ. If one system produces Signal(direction, generated_at, horizon, confidence) and another expects alpha_score, entry_ts, and weight_hint, the boundary needs stronger alignment. The internal API creates that alignment by enforcing shared names, units, valid ranges, and temporal interpretation.
A sensible contract design starts with five invariants:
Causality: Every object that can influence a decision carries a timestamp or event key that identifies when it became admissible. This includes bars, features, signals, risk checks, and order updates. With that structure in place, replay testing becomes meaningful.
Provenance: A bar is more than OHLCV. It carries information about whether it came from vendor history, live reconstruction, a synthetic resampler, or a replay archive. That small field turns future debugging into straightforward engineering.
State separability: Data objects, signal objects, order-intent objects, broker-state objects, and portfolio-ledger objects each remain distinct. This separation keeps assumptions from leaking across modules and preserves clarity at the interfaces.
Versioning: Schemas evolve, and they should evolve explicitly. A
schema_versionfield together with artifacted OpenAPI snapshots provides inexpensive and reliable protection.Idempotency: Commands that change state, especially order-related commands, should support replay without accidental duplication. Client order IDs provide the standard mechanism.
These invariants can be supported with concise Pydantic models. Here is a small example:
from datetime import datetime, timezone
from pydantic import BaseModel, Field, field_validator
from typing import Literal
class Bar(BaseModel):
schema_version: str = "1.0"
source: Literal["vendor_history", "live_recon", "replay"]
symbol: str
ts_open: datetime
ts_close: datetime
open: float = Field(gt=0)
high: float = Field(gt=0)
low: float = Field(gt=0)
close: float = Field(gt=0)
volume: float = Field(ge=0)
@field_validator("ts_open", "ts_close")
@classmethod
def enforce_timezone(cls, v: datetime):
if v.tzinfo is None:
raise ValueError("timestamps must be timezone-aware")
return v.astimezone(timezone.utc)
class Signal(BaseModel):
schema_version: str = "1.0"
generated_at: datetime
symbol: str
direction: int
horizon_s: int = Field(gt=0)
confidence: float = Field(ge=0.0, le=1.0)
model_tag: strThe snippet already delivers three concrete benefits. Invalid states fail early, schemas become inspectable by both humans and machines and, client libraries can be generated or written against a stable description rather than inherited through oral tradition.
The internal API also makes research and live mode comparable by design. A strong pattern places all market-data acquisition behind adapter interfaces and exposes only canonical internal models to the rest of the stack. The rest of the code then works with the same objects whether bars came from Alpaca, IBKR, a CSV archive, a replay file, or MetaTrader. Sources are not interchangeable, but their specific oddities belong at the boundary, where they can be normalized, instead of spreading through the strategy codebase.
Do you know what is premature? To build a large platform before the contracts are understood. A lean internal API can live inside a single FastAPI application with a few strict models and a small set of endpoints. What matters is that the contracts are defined before the complexity arrives.
A practical minimal endpoint set for a quant prototyping API includes /health, /bars, /signal/run, /orders/simulate, /orders/submit, /positions, /portfolio/state, and /backtest/run. Each one benefits from typed inputs and outputs, deterministic validation, and a precise statement of time semantics.
The internal API also supports replay, allowing a team to verify that the object evaluated in research matches the object the live system would have seen. An endpoint that consumes archived messages and emits canonical internal states often adds more value than a second optimization endpoint. Little by little we will be building things like this:
Creating the architecture for a trading API
Let’s build a contracts, domain state, adapters, services, routers, lifecycle, and local entrypoint. That structure fits FastAPI well because bigger apps are meant to be split into multiple files and grouped with APIRouter, then assembled with include_router(). FastAPI also recommends lifespan for startup/shutdown work, and it supports WebSockets as a first-class transport beside HTTP. Pydantic’s Field(...) and field_validator(...) are the right tools for constraints and cross-field validation in the schema layer.
The target layout is this:
app/
├── __init__.py
├── main.py
├── dependencies.py
├── core/
│ ├── __init__.py
│ └── lifespan.py
├── domain/
│ ├── __init__.py
│ ├── enums.py
│ └── state.py
├── schemas/
│ ├── __init__.py
│ ├── market.py
│ └── trading.py
├── adapters/
│ ├── __init__.py
│ └── synthetic_market.py
├── services/
│ ├── __init__.py
│ └── trading_engine.py
└── routers/
├── __init__.py
├── health.py
├── market.py
├── trading.py
└── stream.pyWhat we are trying to do is:
To keep the market contracts separate from the mutable engine state.
To keep the data source separate from the trading logic.
And to keep the trading logic separate from HTTP and WebSocket transport.
This first part is intentionally tiny. Side is a trading concept, so it belongs in the domain layer. That keeps the meaning of BUY and SELL independent from request parsing or response formatting.
from enum import Enum
class Side(str, Enum):
BUY = "BUY"
SELL = "SELL"The next defines the public market-data contract. A Bar is the canonical market object that moves through the API. BarsResponse is the public envelope returned by the market-data route. Pydantic models are the right place for this because they give you typed structure, field constraints, and schema metadata, and FastAPI uses them for parsing and automatic docs.
from __future__ import annotations
from typing import List
from pydantic import BaseModel, Field
class Bar(BaseModel):
ts: int = Field(..., description="Unix timestamp in seconds")
open: float
high: float
low: float
close: float
volume: float
class BarsResponse(BaseModel):
symbol: str
bars: List[Bar]The important point here is that from now on the rest of the code deals with one well-defined object.
The next chunk defines the public trading contract by covering signal requests, order requests, portfolio views, and backtest results. The validators matter because they stop invalid states before they reach the engine. For example, slow <= fast is malformed input, and Pydantic field validators are made for exactly that kind of check.
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator
from app.domain.enums import Side
class SignalRequest(BaseModel):
symbol: str
fast: int = Field(5, ge=2, le=500)
slow: int = Field(20, ge=3, le=1000)
@field_validator("slow")
@classmethod
def validate_windows(cls, v: int, info: ValidationInfo) -> int:
fast = info.data.get("fast")
if fast is not None and v <= fast:
raise ValueError("slow must be greater than fast")
return v
class SignalResponse(BaseModel):
symbol: str
signal: int
fast_sma: float
slow_sma: float
last_close: float
ts: int
class OrderRequest(BaseModel):
model_config = ConfigDict(validate_assignment=True)
symbol: str
side: Side
qty: float = Field(..., gt=0)
price: Optional[float] = Field(None, gt=0)
class PositionModel(BaseModel):
symbol: str
qty: float
avg_price: float
market_price: float
unrealized_pnl: float
class PortfolioResponse(BaseModel):
cash: float
equity: float
positions: List[PositionModel]
class OrderResponse(BaseModel):
order_id: int
symbol: str
side: Side
qty: float
fill_price: float
status: str
ts: int
class BacktestRequest(BaseModel):
symbol: str
fast: int = Field(10, ge=2, le=500)
slow: int = Field(30, ge=3, le=1000)
qty: float = Field(1.0, gt=0)
fee_bps: float = Field(1.0, ge=0.0, le=1000.0)
slippage_bps: float = Field(0.5, ge=0.0, le=1000.0)
@field_validator("slow")
@classmethod
def validate_windows(cls, v: int, info: ValidationInfo) -> int:
fast = info.data.get("fast")
if fast is not None and v <= fast:
raise ValueError("slow must be greater than fast")
return v
class TradeModel(BaseModel):
entry_ts: int
exit_ts: int
side: Side
qty: float
entry_price: float
exit_price: float
pnl: float
class BacktestResponse(BaseModel):
symbol: str
trades: int
win_rate: float
total_pnl: float
avg_pnl: float
sharpe_like: float
equity_curve: List[float]
trade_log: List[TradeModel]The rest of the architecture will work around these contracts.
In the next snippet we deal with internal machine memory. The schema layer tells clients what the API accepts and returns. The state layer tells the engine what it holds. The async lock is also part of this state because the app has concurrent flows: background market updates and incoming requests. That shared state must be protected.
from __future__ import annotations
import asyncio
from collections import deque
from dataclasses import dataclass, field
from typing import Deque, Dict
from app.schemas.market import Bar
@dataclass
class PositionState:
qty: float = 0.0
avg_price: float = 0.0
@dataclass
class EngineState:
bars: Dict[str, Deque[Bar]] = field(default_factory=dict)
positions: Dict[str, PositionState] = field(default_factory=dict)
cash: float = 100_000.0
next_order_id: int = 1
lock: asyncio.Lock = field(default_factory=asyncio.Lock)This is one of the most important boundaries in the whole design. It prevents the internal engine memory from getting confused with the external API contract.
Now the adapter becomes the place where the market data is normalized into the internal Bar contract. Here we used synthetic data but later it can be any data provider. In this example, the engine should only consume Bar objects. That is the point of the adapter boundary.
from __future__ import annotations
import time
from collections import deque
from typing import Deque, Dict, List
import numpy as np
from app.schemas.market import Bar
class SyntheticMarketAdapter:
def __init__(
self,
symbols: List[str],
max_bars: int = 2000,
history_seed: int = 7,
live_seed: int = 42,
):
self.symbols = symbols
self.max_bars = max_bars
self.history_seed = history_seed
self.live_rng = np.random.default_rng(live_seed)
def seed_history(self) -> Dict[str, Deque[Bar]]:
bars_by_symbol: Dict[str, Deque[Bar]] = {
s: deque(maxlen=self.max_bars) for s in self.symbols
}
now = int(time.time()) - 300
rng = np.random.default_rng(self.history_seed)
for symbol in self.symbols:
base = 100.0 if symbol == "SPY" else 400.0 if symbol == "QQQ" else 1.10
prices = [base]
for _ in range(300):
shock = rng.normal(0.0, 0.25 if symbol != "EURUSD" else 0.002)
prices.append(max(0.1, prices[-1] + shock))
for i in range(1, len(prices)):
o = prices[i - 1]
c = prices[i]
spread = abs(c - o) + abs(rng.normal(0.0, 0.15))
h = max(o, c) + spread * 0.5
l = min(o, c) - spread * 0.5
v = float(abs(rng.normal(1000, 150)))
bars_by_symbol[symbol].append(
Bar(
ts=now + i,
open=float(o),
high=float(h),
low=float(l),
close=float(c),
volume=v,
)
)
return bars_by_symbol
def next_bar(self, symbol: str, last: Bar) -> Bar:
ts = int(time.time())
drift = 0.02 if symbol == "SPY" else 0.03 if symbol == "QQQ" else 0.0
sigma = 0.35 if symbol != "EURUSD" else 0.003
ret = drift + self.live_rng.normal(0.0, sigma)
close = max(0.1, last.close + ret)
spread = abs(close - last.close) + abs(self.live_rng.normal(0.0, sigma * 0.5))
return Bar(
ts=ts,
open=float(last.close),
high=float(max(last.close, close) + spread * 0.25),
low=float(min(last.close, close) - spread * 0.25),
close=float(close),
volume=float(abs(self.live_rng.normal(1200, 200))),
)The two methods play different roles. seed_history() creates an initial in-memory history so the app has something to work with. next_bar() generates the next event in the stream.
The next snippet is the center of the prototype. It is the service layer. Its job is to hold the actual trading logic and state transitions: retrieve bars, compute the latest price, generate a signal, simulate orders, compute portfolio state, and run the backtest. It should know nothing about HTTP status codes, route decorators, or WebSocket handshakes.
from __future__ import annotations
import asyncio
import math
from typing import List
import numpy as np
from app.adapters.synthetic_market import SyntheticMarketAdapter
from app.domain.enums import Side
from app.domain.state import EngineState, PositionState
from app.schemas.market import Bar
from app.schemas.trading import (
BacktestRequest,
BacktestResponse,
OrderRequest,
OrderResponse,
PortfolioResponse,
PositionModel,
SignalResponse,
TradeModel,
)
class TradingEngine:
"""
Prototype trading engine:
- stores bars in memory
- computes a simple SMA signal
- simulates orders
- maintains portfolio state
- runs a basic next-open backtest
"""
def __init__(self, adapter: SyntheticMarketAdapter, starting_cash: float = 100_000.0):
self.adapter = adapter
self.symbols = adapter.symbols
bars = self.adapter.seed_history()
self.state = EngineState(
bars=bars,
positions={s: PositionState() for s in self.symbols},
cash=starting_cash,
)
async def update_market(self) -> None:
while True:
async with self.state.lock:
for symbol, bars in self.state.bars.items():
last = bars[-1]
new_bar = self.adapter.next_bar(symbol, last)
bars.append(new_bar)
await asyncio.sleep(1.0)
def _check_symbol(self, symbol: str) -> None:
if symbol not in self.state.bars:
raise ValueError(f"unsupported symbol: {symbol}")
def get_bars(self, symbol: str, limit: int = 200) -> List[Bar]:
self._check_symbol(symbol)
return list(self.state.bars[symbol])[-limit:]
def latest_price(self, symbol: str) -> float:
self._check_symbol(symbol)
return float(self.state.bars[symbol][-1].close)
def signal_sma(self, symbol: str, fast: int, slow: int) -> SignalResponse:
bars = self.get_bars(symbol, limit=slow + 5)
closes = np.array([b.close for b in bars], dtype=np.float64)
if closes.size < slow:
raise ValueError("not enough bars")
fast_sma = float(np.mean(closes[-fast:]))
slow_sma = float(np.mean(closes[-slow:]))
signal = 1 if fast_sma > slow_sma else -1 if fast_sma < slow_sma else 0
return SignalResponse(
symbol=symbol,
signal=signal,
fast_sma=fast_sma,
slow_sma=slow_sma,
last_close=float(closes[-1]),
ts=bars[-1].ts,
)
async def simulate_order(self, request: OrderRequest) -> OrderResponse:
self._check_symbol(request.symbol)
async with self.state.lock:
fill_price = float(request.price or self.latest_price(request.symbol))
notional = request.qty * fill_price
position = self.state.positions[request.symbol]
if request.side == Side.BUY:
if self.state.cash < notional:
raise ValueError("insufficient cash")
new_qty = position.qty + request.qty
if new_qty <= 0:
raise ValueError("invalid resulting quantity")
position.avg_price = (
(position.qty * position.avg_price + request.qty * fill_price) / new_qty
if position.qty > 0
else fill_price
)
position.qty = new_qty
self.state.cash -= notional
else:
if position.qty < request.qty:
raise ValueError("not enough inventory to sell")
position.qty -= request.qty
self.state.cash += notional
if math.isclose(position.qty, 0.0, abs_tol=1e-12):
position.qty = 0.0
position.avg_price = 0.0
order_id = self.state.next_order_id
self.state.next_order_id += 1
return OrderResponse(
order_id=order_id,
symbol=request.symbol,
side=request.side,
qty=request.qty,
fill_price=fill_price,
status="FILLED",
ts=self.state.bars[request.symbol][-1].ts,
)
def portfolio(self) -> PortfolioResponse:
positions_out: List[PositionModel] = []
equity = self.state.cash
for symbol, pos in self.state.positions.items():
if pos.qty <= 0:
continue
market = self.latest_price(symbol)
unreal = (market - pos.avg_price) * pos.qty
equity += pos.qty * market
positions_out.append(
PositionModel(
symbol=symbol,
qty=pos.qty,
avg_price=pos.avg_price,
market_price=market,
unrealized_pnl=unreal,
)
)
return PortfolioResponse(
cash=self.state.cash,
equity=equity,
positions=positions_out,
)
def backtest_sma(self, request: BacktestRequest) -> BacktestResponse:
bars = self.get_bars(request.symbol, limit=1000)
closes = np.array([b.close for b in bars], dtype=np.float64)
opens = np.array([b.open for b in bars], dtype=np.float64)
ts = np.array([b.ts for b in bars], dtype=np.int64)
if closes.size <= request.slow + 2:
raise ValueError("not enough bars for backtest")
fast_ma = self._rolling_mean(closes, request.fast)
slow_ma = self._rolling_mean(closes, request.slow)
offset_fast = request.fast - 1
offset_slow = request.slow - 1
signal = np.zeros(closes.size, dtype=np.int8)
for i in range(closes.size):
if i < offset_fast or i < offset_slow:
continue
f = fast_ma[i - offset_fast]
s = slow_ma[i - offset_slow]
signal[i] = 1 if f > s else -1 if f < s else 0
position = 0
entry_price = 0.0
entry_ts = 0
trade_log: List[TradeModel] = []
equity_curve = [0.0]
cumulative_pnl = 0.0
fee = request.fee_bps / 10_000.0
slip = request.slippage_bps / 10_000.0
for i in range(1, closes.size - 1):
prev_sig = signal[i - 1]
curr_sig = signal[i]
next_open = opens[i + 1]
bullish_cross = prev_sig <= 0 and curr_sig > 0
bearish_cross = prev_sig >= 0 and curr_sig < 0
if position == 0 and bullish_cross:
position = 1
entry_price = next_open * (1.0 + slip + fee)
entry_ts = int(ts[i + 1])
elif position == 1 and bearish_cross:
exit_price = next_open * (1.0 - slip - fee)
pnl = (exit_price - entry_price) * request.qty
cumulative_pnl += pnl
equity_curve.append(cumulative_pnl)
trade_log.append(
TradeModel(
entry_ts=entry_ts,
exit_ts=int(ts[i + 1]),
side=Side.BUY,
qty=request.qty,
entry_price=float(entry_price),
exit_price=float(exit_price),
pnl=float(pnl),
)
)
position = 0
entry_price = 0.0
entry_ts = 0
if position == 1:
exit_price = closes[-1] * (1.0 - slip - fee)
pnl = (exit_price - entry_price) * request.qty
cumulative_pnl += pnl
equity_curve.append(cumulative_pnl)
trade_log.append(
TradeModel(
entry_ts=entry_ts,
exit_ts=int(ts[-1]),
side=Side.BUY,
qty=request.qty,
entry_price=float(entry_price),
exit_price=float(exit_price),
pnl=float(pnl),
)
)
pnls = np.array([t.pnl for t in trade_log], dtype=np.float64)
trades = int(pnls.size)
win_rate = float(np.mean(pnls > 0)) if trades > 0 else 0.0
avg_pnl = float(np.mean(pnls)) if trades > 0 else 0.0
sharpe_like = float(np.mean(pnls) / (np.std(pnls) + 1e-12)) if trades > 1 else 0.0
return BacktestResponse(
symbol=request.symbol,
trades=trades,
win_rate=win_rate,
total_pnl=float(np.sum(pnls)) if trades > 0 else 0.0,
avg_pnl=avg_pnl,
sharpe_like=sharpe_like,
equity_curve=[float(x) for x in equity_curve],
trade_log=trade_log,
)
@staticmethod
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
w = np.ones(window, dtype=np.float64) / window
return np.convolve(x, w, mode="valid")The constructor receives the adapter and uses it to build the initial state. That means the engine never decides how the data source works. update_market() runs forever and appends one new bar per symbol, but the logic for generating that bar lives in the adapter. signal_sma() turns market state into a typed signal response. simulate_order() mutates positions and cash under the lock so state transitions stay consistent. portfolio() turns internal state into a public response model. backtest_sma() uses the same market objects and similar decision surface as the live part of the prototype, which is a good habit because it keeps the research surface and the API surface aligned.
Now we link the app and the routers. Instead of importing a mutable global engine, routers ask FastAPI for the engine through a dependency. That is a cleaner pattern because it makes the engine app-scoped rather than module-scoped.
from fastapi import Request
from app.services.trading_engine import TradingEngine
def get_engine(request: Request) -> TradingEngine:
return request.app.state.engineThis router exposes the smallest possible operational surface: is the app alive, which symbols does the engine know about, and what time is it on the server. APIRouter exists to group related path operations into separate files and include them later in the main app.
import time
from fastapi import APIRouter, Depends
from app.dependencies import get_engine
from app.services.trading_engine import TradingEngine
router = APIRouter(tags=["health"])
@router.get("/health")
async def health(engine: TradingEngine = Depends(get_engine)):
return {
"status": "ok",
"symbols": engine.symbols,
"server_time": int(time.time()),
}The important thing is to know how the market is updated.
This router exposes the market-data endpoint. It receives the symbol and limit from the HTTP request, asks the engine for data, translates an internal ValueError into an HTTP exception, and returns a typed response. That is the right shape for a transport layer: receive, delegate, map errors, return.
from fastapi import APIRouter, Depends, HTTPException
from app.dependencies import get_engine
from app.schemas.market import BarsResponse
from app.services.trading_engine import TradingEngine
router = APIRouter(tags=["market"])
@router.get("/bars/{symbol}", response_model=BarsResponse)
async def get_bars(symbol: str, limit: int = 200, engine: TradingEngine = Depends(get_engine)):
try:
bars = engine.get_bars(symbol, limit=limit)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
return BarsResponse(symbol=symbol, bars=bars)This covers signal generation, order simulation, portfolio state, and backtest execution. Each route receives a typed request, obtains the engine through the dependency layer, delegates the logic, and returns a typed response. The routers stay thin because all financial meaning lives in the service layer.
from fastapi import APIRouter, Depends, HTTPException
from app.dependencies import get_engine
from app.schemas.trading import (
BacktestRequest,
BacktestResponse,
OrderRequest,
OrderResponse,
PortfolioResponse,
SignalRequest,
SignalResponse,
)
from app.services.trading_engine import TradingEngine
router = APIRouter(tags=["trading"])
@router.post("/signal/run", response_model=SignalResponse)
async def run_signal(request: SignalRequest, engine: TradingEngine = Depends(get_engine)):
try:
return engine.signal_sma(request.symbol, request.fast, request.slow)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@router.post("/orders/simulate", response_model=OrderResponse)
async def simulate_order(request: OrderRequest, engine: TradingEngine = Depends(get_engine)):
try:
return await engine.simulate_order(request)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@router.get("/portfolio", response_model=PortfolioResponse)
async def get_portfolio(engine: TradingEngine = Depends(get_engine)):
return engine.portfolio()
@router.post("/backtest/run", response_model=BacktestResponse)
async def run_backtest(request: BacktestRequest, engine: TradingEngine = Depends(get_engine)):
try:
return engine.backtest_sma(request)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from eFastAPI also supports WebSockets as a first-class transport. This route exposes a live stream of the latest bar for a symbol. It reads the engine from websocket.app.state, validates the symbol, accepts the connection, and pushes one JSON payload per second. This is a different transport model from HTTP, so it deserves its own router file.
import asyncio
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
router = APIRouter(tags=["stream"])
@router.websocket("/ws/{symbol}")
async def ws_prices(websocket: WebSocket, symbol: str):
engine = websocket.app.state.engine
try:
engine._check_symbol(symbol)
except ValueError:
await websocket.close(code=1008)
return
await websocket.accept()
try:
while True:
bar = engine.get_bars(symbol, limit=1)[0]
await websocket.send_json(
{
"symbol": symbol,
"ts": bar.ts,
"price": bar.close,
"open": bar.open,
"high": bar.high,
"low": bar.low,
"volume": bar.volume,
}
)
await asyncio.sleep(1.0)
except WebSocketDisconnect:
returnFastAPI recommends lifespan for startup and shutdown logic, and it makes clear that if you use lifespan, the old startup and shutdown event handlers are no longer called. This is the right place to build the adapter, build the engine, start the background market task, attach both to the app state, and cancel the task when the process shuts down.
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.adapters.synthetic_market import SyntheticMarketAdapter
from app.services.trading_engine import TradingEngine
@asynccontextmanager
async def lifespan(app: FastAPI):
adapter = SyntheticMarketAdapter(symbols=["SPY", "QQQ", "EURUSD"])
engine = TradingEngine(adapter=adapter)
market_task = asyncio.create_task(engine.update_market())
app.state.engine = engine
app.state.market_task = market_task
try:
yield
finally:
market_task.cancel()
try:
await market_task
except asyncio.CancelledError:
passThis final part assembles the application. It creates the FastAPI app, attaches metadata, includes the routers, and defines the local Uvicorn entrypoint.
from fastapi import FastAPI
from app.core.lifespan import lifespan
from app.routers import health, market, stream, trading
def create_app() -> FastAPI:
app = FastAPI(
title="Prototype Quant Trading API",
description="Reference architecture for a prototyping trading API",
lifespan=lifespan,
)
app.include_router(health.router)
app.include_router(market.router)
app.include_router(trading.router)
app.include_router(stream.router)
return app
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="127.0.0.1", port=8000, reload=True)Okay, let’s execute this
INFO: Started server process [252]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [252]Cool! This is alive! In practice, the next step for most quant teams is to define a narrow internal API with a few canonical models and a few endpoints. Once those are stable, extra layers can be added without changing the meaning of the core objects.
A final misconception is worth removing. Good API design is quantitative thinking applied to state, observability, and execution. That is why the API layer matters, why financial APIs must be handled with more skepticism than most research pipelines apply, and why designing your own internal API is one of the most practical acts for a systematic trader.
Cool guys! Good job today! Remember that you have the full code in the appendix so you can torture it. Time to say good bye! Stay sharp, stay bold, stay unshakeable 📈
PS: What do you value more? Downloading data from an API or having it plotted?
This is an invitation-only access to our QUANT COMMUNITY, so we verify numbers to avoid spammers and scammers. Feel free to join or decline at any time. Tap the WhatsApp icon below to join
Appendix
Full code:
from __future__ import annotations
import asyncio
import math
import time
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from enum import Enum
from typing import Deque, Dict, List, Optional
import numpy as np
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator
# DOMAIN
class Side(str, Enum):
BUY = "BUY"
SELL = "SELL"
# SCHEMAS
class Bar(BaseModel):
ts: int = Field(..., description="Unix timestamp in seconds")
open: float
high: float
low: float
close: float
volume: float
class BarsResponse(BaseModel):
symbol: str
bars: List[Bar]
class SignalRequest(BaseModel):
symbol: str
fast: int = Field(5, ge=2, le=500)
slow: int = Field(20, ge=3, le=1000)
@field_validator("slow")
@classmethod
def validate_windows(cls, v: int, info: ValidationInfo) -> int:
fast = info.data.get("fast")
if fast is not None and v <= fast:
raise ValueError("slow must be greater than fast")
return v
class SignalResponse(BaseModel):
symbol: str
signal: int
fast_sma: float
slow_sma: float
last_close: float
ts: int
class OrderRequest(BaseModel):
model_config = ConfigDict(validate_assignment=True)
symbol: str
side: Side
qty: float = Field(..., gt=0)
price: Optional[float] = Field(None, gt=0)
class PositionModel(BaseModel):
symbol: str
qty: float
avg_price: float
market_price: float
unrealized_pnl: float
class PortfolioResponse(BaseModel):
cash: float
equity: float
positions: List[PositionModel]
class OrderResponse(BaseModel):
order_id: int
symbol: str
side: Side
qty: float
fill_price: float
status: str
ts: int
class BacktestRequest(BaseModel):
symbol: str
fast: int = Field(10, ge=2, le=500)
slow: int = Field(30, ge=3, le=1000)
qty: float = Field(1.0, gt=0)
fee_bps: float = Field(1.0, ge=0.0, le=1000.0)
slippage_bps: float = Field(0.5, ge=0.0, le=1000.0)
@field_validator("slow")
@classmethod
def validate_windows(cls, v: int, info: ValidationInfo) -> int:
fast = info.data.get("fast")
if fast is not None and v <= fast:
raise ValueError("slow must be greater than fast")
return v
class TradeModel(BaseModel):
entry_ts: int
exit_ts: int
side: Side
qty: float
entry_price: float
exit_price: float
pnl: float
class BacktestResponse(BaseModel):
symbol: str
trades: int
win_rate: float
total_pnl: float
avg_pnl: float
sharpe_like: float
equity_curve: List[float]
trade_log: List[TradeModel]
# INTERNAL STATE
@dataclass
class PositionState:
qty: float = 0.0
avg_price: float = 0.0
@dataclass
class EngineState:
bars: Dict[str, Deque[Bar]] = field(default_factory=dict)
positions: Dict[str, PositionState] = field(default_factory=dict)
cash: float = 100_000.0
next_order_id: int = 1
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
# ADAPTER
class SyntheticMarketAdapter:
def __init__(
self,
symbols: List[str],
max_bars: int = 2000,
history_seed: int = 7,
live_seed: int = 42,
):
self.symbols = symbols
self.max_bars = max_bars
self.history_seed = history_seed
self.live_rng = np.random.default_rng(live_seed)
def seed_history(self) -> Dict[str, Deque[Bar]]:
bars_by_symbol: Dict[str, Deque[Bar]] = {
s: deque(maxlen=self.max_bars) for s in self.symbols
}
now = int(time.time()) - 300
rng = np.random.default_rng(self.history_seed)
for symbol in self.symbols:
base = 100.0 if symbol == "SPY" else 400.0 if symbol == "QQQ" else 1.10
prices = [base]
for _ in range(300):
shock = rng.normal(0.0, 0.25 if symbol != "EURUSD" else 0.002)
prices.append(max(0.1, prices[-1] + shock))
for i in range(1, len(prices)):
o = prices[i - 1]
c = prices[i]
spread = abs(c - o) + abs(rng.normal(0.0, 0.15))
h = max(o, c) + spread * 0.5
l = min(o, c) - spread * 0.5
v = float(abs(rng.normal(1000, 150)))
bars_by_symbol[symbol].append(
Bar(
ts=now + i,
open=float(o),
high=float(h),
low=float(l),
close=float(c),
volume=v,
)
)
return bars_by_symbol
def next_bar(self, symbol: str, last: Bar) -> Bar:
ts = int(time.time())
drift = 0.02 if symbol == "SPY" else 0.03 if symbol == "QQQ" else 0.0
sigma = 0.35 if symbol != "EURUSD" else 0.003
ret = drift + self.live_rng.normal(0.0, sigma)
close = max(0.1, last.close + ret)
spread = abs(close - last.close) + abs(self.live_rng.normal(0.0, sigma * 0.5))
return Bar(
ts=ts,
open=float(last.close),
high=float(max(last.close, close) + spread * 0.25),
low=float(min(last.close, close) - spread * 0.25),
close=float(close),
volume=float(abs(self.live_rng.normal(1200, 200))),
)
# SERVICE LAYER
class TradingEngine:
"""
Prototype trading engine:
- stores bars in memory
- computes a simple SMA signal
- simulates orders
- maintains portfolio state
- runs a basic next-open backtest
"""
def __init__(self, adapter: SyntheticMarketAdapter, starting_cash: float = 100_000.0):
self.adapter = adapter
self.symbols = adapter.symbols
bars = self.adapter.seed_history()
self.state = EngineState(
bars=bars,
positions={s: PositionState() for s in self.symbols},
cash=starting_cash,
)
async def update_market(self) -> None:
while True:
async with self.state.lock:
for symbol, bars in self.state.bars.items():
last = bars[-1]
new_bar = self.adapter.next_bar(symbol, last)
bars.append(new_bar)
await asyncio.sleep(1.0)
def _check_symbol(self, symbol: str) -> None:
if symbol not in self.state.bars:
raise ValueError(f"unsupported symbol: {symbol}")
def get_bars(self, symbol: str, limit: int = 200) -> List[Bar]:
self._check_symbol(symbol)
return list(self.state.bars[symbol])[-limit:]
def latest_price(self, symbol: str) -> float:
self._check_symbol(symbol)
return float(self.state.bars[symbol][-1].close)
def signal_sma(self, symbol: str, fast: int, slow: int) -> SignalResponse:
bars = self.get_bars(symbol, limit=slow + 5)
closes = np.array([b.close for b in bars], dtype=np.float64)
if closes.size < slow:
raise ValueError("not enough bars")
fast_sma = float(np.mean(closes[-fast:]))
slow_sma = float(np.mean(closes[-slow:]))
signal = 1 if fast_sma > slow_sma else -1 if fast_sma < slow_sma else 0
return SignalResponse(
symbol=symbol,
signal=signal,
fast_sma=fast_sma,
slow_sma=slow_sma,
last_close=float(closes[-1]),
ts=bars[-1].ts,
)
async def simulate_order(self, request: OrderRequest) -> OrderResponse:
self._check_symbol(request.symbol)
async with self.state.lock:
fill_price = float(request.price or self.latest_price(request.symbol))
notional = request.qty * fill_price
position = self.state.positions[request.symbol]
if request.side == Side.BUY:
if self.state.cash < notional:
raise ValueError("insufficient cash")
new_qty = position.qty + request.qty
if new_qty <= 0:
raise ValueError("invalid resulting quantity")
position.avg_price = (
(position.qty * position.avg_price + request.qty * fill_price) / new_qty
if position.qty > 0
else fill_price
)
position.qty = new_qty
self.state.cash -= notional
else:
if position.qty < request.qty:
raise ValueError("not enough inventory to sell")
position.qty -= request.qty
self.state.cash += notional
if math.isclose(position.qty, 0.0, abs_tol=1e-12):
position.qty = 0.0
position.avg_price = 0.0
order_id = self.state.next_order_id
self.state.next_order_id += 1
return OrderResponse(
order_id=order_id,
symbol=request.symbol,
side=request.side,
qty=request.qty,
fill_price=fill_price,
status="FILLED",
ts=self.state.bars[request.symbol][-1].ts,
)
def portfolio(self) -> PortfolioResponse:
positions_out: List[PositionModel] = []
equity = self.state.cash
for symbol, pos in self.state.positions.items():
if pos.qty <= 0:
continue
market = self.latest_price(symbol)
unreal = (market - pos.avg_price) * pos.qty
equity += pos.qty * market
positions_out.append(
PositionModel(
symbol=symbol,
qty=pos.qty,
avg_price=pos.avg_price,
market_price=market,
unrealized_pnl=unreal,
)
)
return PortfolioResponse(
cash=self.state.cash,
equity=equity,
positions=positions_out,
)
def backtest_sma(self, request: BacktestRequest) -> BacktestResponse:
bars = self.get_bars(request.symbol, limit=1000)
closes = np.array([b.close for b in bars], dtype=np.float64)
opens = np.array([b.open for b in bars], dtype=np.float64)
ts = np.array([b.ts for b in bars], dtype=np.int64)
if closes.size <= request.slow + 2:
raise ValueError("not enough bars for backtest")
fast_ma = self._rolling_mean(closes, request.fast)
slow_ma = self._rolling_mean(closes, request.slow)
offset_fast = request.fast - 1
offset_slow = request.slow - 1
signal = np.zeros(closes.size, dtype=np.int8)
for i in range(closes.size):
if i < offset_fast or i < offset_slow:
continue
f = fast_ma[i - offset_fast]
s = slow_ma[i - offset_slow]
signal[i] = 1 if f > s else -1 if f < s else 0
position = 0
entry_price = 0.0
entry_ts = 0
trade_log: List[TradeModel] = []
equity_curve = [0.0]
cumulative_pnl = 0.0
fee = request.fee_bps / 10_000.0
slip = request.slippage_bps / 10_000.0
for i in range(1, closes.size - 1):
prev_sig = signal[i - 1]
curr_sig = signal[i]
next_open = opens[i + 1]
bullish_cross = prev_sig <= 0 and curr_sig > 0
bearish_cross = prev_sig >= 0 and curr_sig < 0
if position == 0 and bullish_cross:
position = 1
entry_price = next_open * (1.0 + slip + fee)
entry_ts = int(ts[i + 1])
elif position == 1 and bearish_cross:
exit_price = next_open * (1.0 - slip - fee)
pnl = (exit_price - entry_price) * request.qty
cumulative_pnl += pnl
equity_curve.append(cumulative_pnl)
trade_log.append(
TradeModel(
entry_ts=entry_ts,
exit_ts=int(ts[i + 1]),
side=Side.BUY,
qty=request.qty,
entry_price=float(entry_price),
exit_price=float(exit_price),
pnl=float(pnl),
)
)
position = 0
entry_price = 0.0
entry_ts = 0
if position == 1:
exit_price = closes[-1] * (1.0 - slip - fee)
pnl = (exit_price - entry_price) * request.qty
cumulative_pnl += pnl
equity_curve.append(cumulative_pnl)
trade_log.append(
TradeModel(
entry_ts=entry_ts,
exit_ts=int(ts[-1]),
side=Side.BUY,
qty=request.qty,
entry_price=float(entry_price),
exit_price=float(exit_price),
pnl=float(pnl),
)
)
pnls = np.array([t.pnl for t in trade_log], dtype=np.float64)
trades = int(pnls.size)
win_rate = float(np.mean(pnls > 0)) if trades > 0 else 0.0
avg_pnl = float(np.mean(pnls)) if trades > 0 else 0.0
sharpe_like = float(np.mean(pnls) / (np.std(pnls) + 1e-12)) if trades > 1 else 0.0
return BacktestResponse(
symbol=request.symbol,
trades=trades,
win_rate=win_rate,
total_pnl=float(np.sum(pnls)) if trades > 0 else 0.0,
avg_pnl=avg_pnl,
sharpe_like=sharpe_like,
equity_curve=[float(x) for x in equity_curve],
trade_log=trade_log,
)
@staticmethod
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
w = np.ones(window, dtype=np.float64) / window
return np.convolve(x, w, mode="valid")
# DEPENDENCY
def get_engine(request: Request) -> TradingEngine:
return request.app.state.engine
# ROUTERS
health_router = APIRouter(tags=["health"])
market_router = APIRouter(tags=["market"])
trading_router = APIRouter(tags=["trading"])
stream_router = APIRouter(tags=["stream"])
@health_router.get("/health")
async def health(engine: TradingEngine = Depends(get_engine)):
return {
"status": "ok",
"symbols": engine.symbols,
"server_time": int(time.time()),
}
@market_router.get("/bars/{symbol}", response_model=BarsResponse)
async def get_bars(symbol: str, limit: int = 200, engine: TradingEngine = Depends(get_engine)):
try:
bars = engine.get_bars(symbol, limit=limit)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
return BarsResponse(symbol=symbol, bars=bars)
@trading_router.post("/signal/run", response_model=SignalResponse)
async def run_signal(request: SignalRequest, engine: TradingEngine = Depends(get_engine)):
try:
return engine.signal_sma(request.symbol, request.fast, request.slow)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@trading_router.post("/orders/simulate", response_model=OrderResponse)
async def simulate_order(request: OrderRequest, engine: TradingEngine = Depends(get_engine)):
try:
return await engine.simulate_order(request)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@trading_router.get("/portfolio", response_model=PortfolioResponse)
async def get_portfolio(engine: TradingEngine = Depends(get_engine)):
return engine.portfolio()
@trading_router.post("/backtest/run", response_model=BacktestResponse)
async def run_backtest(request: BacktestRequest, engine: TradingEngine = Depends(get_engine)):
try:
return engine.backtest_sma(request)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@stream_router.websocket("/ws/{symbol}")
async def ws_prices(websocket: WebSocket, symbol: str):
engine: TradingEngine = websocket.scope["app"].state.engine
try:
engine._check_symbol(symbol)
except ValueError:
await websocket.close(code=1008)
return
await websocket.accept()
try:
while True:
bar = engine.get_bars(symbol, limit=1)[0]
await websocket.send_json(
{
"symbol": symbol,
"ts": bar.ts,
"price": bar.close,
"open": bar.open,
"high": bar.high,
"low": bar.low,
"volume": bar.volume,
}
)
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
# LIFESPAN
@asynccontextmanager
async def lifespan(app: FastAPI):
adapter = SyntheticMarketAdapter(symbols=["SPY", "QQQ", "EURUSD"])
engine = TradingEngine(adapter=adapter)
market_task = asyncio.create_task(engine.update_market())
app.state.engine = engine
app.state.market_task = market_task
try:
yield
finally:
market_task.cancel()
try:
await market_task
except asyncio.CancelledError:
pass
# APP ASSEMBLY
def create_app() -> FastAPI:
app = FastAPI(
title="Prototype Quant Trading API",
description="Single-file trading API for testing",
lifespan=lifespan,
)
app.include_router(health_router)
app.include_router(market_router)
app.include_router(trading_router)
app.include_router(stream_router)
return app
app = create_app()
# MAIN
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)




















