Skip to content

Streaming (FPSS)

ThetaDataDx ships an FPSS (Feed Processing Streaming Server) client: persistent TLS/TCP connection, SPKI certificate pinning, delta-decompressed FIT frames, and an SPSC ring buffer for event dispatch.

This page covers the streaming model at the Getting Started level. For event shapes, reconnection semantics, latency measurement, and per-SDK method references, see the dedicated Real-Time Streaming section.

Architecture

Events are decoded from the FIT wire format and delta-decompressed on a dedicated I/O thread, then dispatched through an LMAX Disruptor SPSC ring buffer to your callback (push mode) or pull-iter consumer (Python / TypeScript / C++). Every data event carries a received_at_ns nanosecond timestamp captured at frame decode time.

SPKI pinning

The FPSS client pins the server's SubjectPublicKeyInfo (SPKI) digest on TLS handshake using constant-time comparison. A server presenting a different public key — from a hostile MITM intermediary or an accidentally swapped certificate — fails the handshake before any auth credentials leave the process.

Pins live in crates/thetadatadx/src/fpss/pinning.rs and are tested against all four production FPSS hosts. Callers do not need to configure pins; the default DirectConfig::production() wires them up.

Dispatch model

SDKPush (callback)Pull (iterator)Event shapeDetails
Rustclient.start_streaming(|event| ...)let iter = client.start_streaming_iter()?; then for event in iter { ... }&FpssEvent enumDisruptor ring dispatch. No Tokio on the hot path.
Pythonclient.start_streaming(callback)with client.streaming_iter() as it: for event in it:typed pyclassIterator raises StopIteration once the queue drains on a stopped session.
TypeScriptclient.startStreaming(callback)for await (const event of client.startStreamingIter())JS objectAsync iterable resolves done: true on terminal end-of-stream.
C++client.start_streaming(lambda)client.start_streaming_iter().next(timeout)TdxFpssEventnext(timeout) returns std::optional<TdxFpssEvent>; ended() flips on terminal close. #[repr(C)] layout.

Push and pull modes are mutually exclusive on a single session. Under the hood both modes read from the same SPSC ring; push mode dispatches into a callback on the LMAX Disruptor consumer thread, pull mode hands events to an iterator the caller drives.

Ring buffer

  • Backing type: LMAX Disruptor SPSC with a power-of-two slot count.
  • Default: 131,072 slots. Caller-configurable at FpssClient::connect.
  • Behavior on overflow: tail-drop with a ServerError control event so the consumer sees explicit backpressure.

Reconnect policy

ReconnectPolicy is an enum with three variants:

VariantBehavior
Auto (default)2 s delay on most transient reasons; 130 s delay after TooManyRequests (code 12). Gives up after 5 consecutive Disconnected(permanent) frames (bad credentials).
NeverNo automatic retry; emit Disconnected event and let the caller handle it.
Custom(fn)User closure fn(reason, attempt) -> Option<Duration> — return None to stop retrying, or a delay to wait before the next attempt. Enables jittered exponential backoff, per-hour budget caps, whatever policy fits the caller's failure model.

Minimal example

rust
use thetadatadx::{ThetaDataDxClient, Credentials, DirectConfig};
use thetadatadx::fpss::{FpssData, FpssEvent};
use thetadatadx::fpss::protocol::Contract;

#[tokio::main]
async fn main() -> Result<(), thetadatadx::Error> {
    let creds = Credentials::from_file("creds.txt")?;
    let client = ThetaDataDxClient::connect(&creds, DirectConfig::production()).await?;

    client.start_streaming(|event: &FpssEvent| match event {
        FpssEvent::Data(FpssData::Quote { contract, bid, ask, .. }) => {
            println!("Quote: {} {bid:.2}/{ask:.2}", contract.symbol);
        }
        FpssEvent::Data(FpssData::Trade { contract, price, size, .. }) => {
            println!("Trade: {} {price:.2} x {size}", contract.symbol);
        }
        _ => {}
    })?;

    client.subscribe(Contract::stock("AAPL").quote())?;
    client.subscribe(Contract::stock("MSFT").trade())?;

    std::thread::park();
    client.stop_streaming();
    Ok(())
}
python
from thetadatadx import Credentials, Config, ThetaDataDxClient, Contract

creds = Credentials.from_file("creds.txt")
client = ThetaDataDxClient(creds, Config.production())

client.subscribe(Contract.stock("AAPL").quote())
client.subscribe(Contract.stock("MSFT").trade())

# Pull-iter mode: context-managed typed iterator over the SPSC
# queue. The iterator raises StopIteration once `stop_streaming()`
# fires AND the queue is fully drained; the `with` block pairs
# `stop_streaming()` + `await_drain()` automatically on exit.
with client.streaming_iter() as it:
    for event in it:
        if event.kind == "quote":
            print(f"Quote: {event.contract.symbol} {event.bid:.2f}/{event.ask:.2f}")
        elif event.kind == "trade":
            print(f"Trade: {event.contract.symbol} {event.price:.2f} x {event.size}")

Next

Released under the Apache-2.0 License.