Streaming (FPSS)
ThetaDataDx ships an FPSS (Feed Processing Streaming Server) client: persistent TLS/TCP connection, SPKI certificate pinning, delta-decompressed FIT frames, and an SPSC ring buffer for event dispatch.
This page covers the streaming model at the Getting Started level. For event shapes, reconnection semantics, latency measurement, and per-SDK method references, see the dedicated Real-Time Streaming section.
Architecture
Events are decoded from the FIT wire format and delta-decompressed on a dedicated I/O thread, then dispatched through an LMAX Disruptor SPSC ring buffer to your callback (push mode) or pull-iter consumer (Python / TypeScript / C++). Every data event carries a received_at_ns nanosecond timestamp captured at frame decode time.
SPKI pinning
The FPSS client pins the server's SubjectPublicKeyInfo (SPKI) digest on TLS handshake using constant-time comparison. A server presenting a different public key — from a hostile MITM intermediary or an accidentally swapped certificate — fails the handshake before any auth credentials leave the process.
Pins live in crates/thetadatadx/src/fpss/pinning.rs and are tested against all four production FPSS hosts. Callers do not need to configure pins; the default DirectConfig::production() wires them up.
Dispatch model
| SDK | Push (callback) | Pull (iterator) | Event shape | Details |
|---|---|---|---|---|
| Rust | client.start_streaming(|event| ...) | let iter = client.start_streaming_iter()?; then for event in iter { ... } | &FpssEvent enum | Disruptor ring dispatch. No Tokio on the hot path. |
| Python | client.start_streaming(callback) | with client.streaming_iter() as it: for event in it: | typed pyclass | Iterator raises StopIteration once the queue drains on a stopped session. |
| TypeScript | client.startStreaming(callback) | for await (const event of client.startStreamingIter()) | JS object | Async iterable resolves done: true on terminal end-of-stream. |
| C++ | client.start_streaming(lambda) | client.start_streaming_iter().next(timeout) | TdxFpssEvent | next(timeout) returns std::optional<TdxFpssEvent>; ended() flips on terminal close. #[repr(C)] layout. |
Push and pull modes are mutually exclusive on a single session. Under the hood both modes read from the same SPSC ring; push mode dispatches into a callback on the LMAX Disruptor consumer thread, pull mode hands events to an iterator the caller drives.
Ring buffer
- Backing type: LMAX Disruptor SPSC with a power-of-two slot count.
- Default: 131,072 slots. Caller-configurable at
FpssClient::connect. - Behavior on overflow: tail-drop with a
ServerErrorcontrol event so the consumer sees explicit backpressure.
Reconnect policy
ReconnectPolicy is an enum with three variants:
| Variant | Behavior |
|---|---|
Auto (default) | 2 s delay on most transient reasons; 130 s delay after TooManyRequests (code 12). Gives up after 5 consecutive Disconnected(permanent) frames (bad credentials). |
Never | No automatic retry; emit Disconnected event and let the caller handle it. |
Custom(fn) | User closure fn(reason, attempt) -> Option<Duration> — return None to stop retrying, or a delay to wait before the next attempt. Enables jittered exponential backoff, per-hour budget caps, whatever policy fits the caller's failure model. |
Minimal example
use thetadatadx::{ThetaDataDxClient, Credentials, DirectConfig};
use thetadatadx::fpss::{FpssData, FpssEvent};
use thetadatadx::fpss::protocol::Contract;
#[tokio::main]
async fn main() -> Result<(), thetadatadx::Error> {
let creds = Credentials::from_file("creds.txt")?;
let client = ThetaDataDxClient::connect(&creds, DirectConfig::production()).await?;
client.start_streaming(|event: &FpssEvent| match event {
FpssEvent::Data(FpssData::Quote { contract, bid, ask, .. }) => {
println!("Quote: {} {bid:.2}/{ask:.2}", contract.symbol);
}
FpssEvent::Data(FpssData::Trade { contract, price, size, .. }) => {
println!("Trade: {} {price:.2} x {size}", contract.symbol);
}
_ => {}
})?;
client.subscribe(Contract::stock("AAPL").quote())?;
client.subscribe(Contract::stock("MSFT").trade())?;
std::thread::park();
client.stop_streaming();
Ok(())
}from thetadatadx import Credentials, Config, ThetaDataDxClient, Contract
creds = Credentials.from_file("creds.txt")
client = ThetaDataDxClient(creds, Config.production())
client.subscribe(Contract.stock("AAPL").quote())
client.subscribe(Contract.stock("MSFT").trade())
# Pull-iter mode: context-managed typed iterator over the SPSC
# queue. The iterator raises StopIteration once `stop_streaming()`
# fires AND the queue is fully drained; the `with` block pairs
# `stop_streaming()` + `await_drain()` automatically on exit.
with client.streaming_iter() as it:
for event in it:
if event.kind == "quote":
print(f"Quote: {event.contract.symbol} {event.bid:.2f}/{event.ask:.2f}")
elif event.kind == "trade":
print(f"Trade: {event.contract.symbol} {event.price:.2f} x {event.size}")Next
- Connecting & subscribing — server selection, flush mode, queue depth
- Handling events — every event type with full field tables
- Reconnection —
reconnect()/reconnect_streaming()APIs - Latency measurement —
received_at_nsandtdbe::latency::latency_ns()