Skip to content

Connecting & Subscribing

Server Environments

ThetaDataDx supports three FPSS server environments:

ConfigPortsUse case
DirectConfig::production()20000/20001Live market data (NJ-A and NJ-B hosts)
DirectConfig::dev()20200/20201Replays a random historical trading day in an infinite loop at max speed. Use when markets are closed.
DirectConfig::stage()20100/20101Testing/staging servers. Frequent reboots, not stable.

All three share the same MDDS (historical) production servers -- only FPSS hosts differ.

TLS & SPKI Pinning

FPSS TLS uses SPKI (Subject Public Key Info) pinning via a constant-time SHA-256 comparison against the captured ThetaData keypair. The pin survives cert renewal as long as ThetaData keeps the keypair; key rotation requires a coordinated client update. MITM attacks presenting a different certificate (even a valid CA-signed one) are rejected with RustlsError::General("FPSS SPKI pin mismatch ...").

Connect (Production)

rust
use thetadatadx::{ThetaDataDxClient, Credentials, DirectConfig};
use thetadatadx::fpss::{FpssData, FpssControl, FpssEvent};
use thetadatadx::fpss::protocol::Contract;

let creds = Credentials::from_file("creds.txt")?;
let client = ThetaDataDxClient::connect(&creds, DirectConfig::production()).await?;

client.start_streaming(|event: &FpssEvent| {
    match event {
        // Every data event carries an `Arc<Contract>` — read the symbol
        // directly, no contract-ID map lookup required.
        FpssEvent::Data(FpssData::Quote { contract, bid, ask, received_at_ns, .. }) => {
            println!("Quote: {} bid={bid:.2} ask={ask:.2} rx={received_at_ns}ns", contract.symbol);
        }
        FpssEvent::Data(FpssData::Trade { contract, price, size, received_at_ns, .. }) => {
            println!("Trade: {} price={price:.2} size={size} rx={received_at_ns}ns", contract.symbol);
        }
        FpssEvent::Control(FpssControl::ContractAssigned { id, contract }) => {
            println!("Contract {id} = {contract}");
        }
        _ => {}
    }
})?;
python
from thetadatadx import Credentials, Config, ThetaDataDxClient

creds = Credentials.from_file("creds.txt")
client = ThetaDataDxClient(creds, Config.production())

client.start_streaming(lambda event: print(event))
cpp
auto creds = tdx::Credentials::from_file("creds.txt");
auto config = tdx::Config::production();
auto client = tdx::UnifiedClient::connect(creds, config);
typescript
import { ThetaDataDxClient } from 'thetadatadx';

const client = await ThetaDataDxClient.connectFromFile('creds.txt');
client.startStreaming((event) => console.log(event));

TIP

Every binding offers two equivalent delivery modes on the unified ThetaDataDxClient: push (start_streaming(callback)) for low-latency dispatch, and pull (start_streaming_iter() in Rust/Python/C++, startStreamingIter() in TypeScript) for the iterator idiom. Pick one per session — the modes are mutually exclusive on the client.

Connect (Dev Server)

The dev server replays historical data at maximum speed -- ideal for testing when markets are closed.

rust
let client = ThetaDataDxClient::connect(&creds, DirectConfig::dev()).await?;
python
client = ThetaDataDxClient(creds, Config.dev())
cpp
auto config = tdx::Config::dev();
auto client = tdx::UnifiedClient::connect(creds, config);
typescript
// Dev server config is not yet exposed in the TypeScript SDK.
// connectFromFile() always uses production config.
const client = await ThetaDataDxClient.connectFromFile('creds.txt');

Dev server trade format

The dev server sends a simplified 8-field trade format instead of the full 16-field production format. This is handled transparently by the SDK -- your code sees the same Trade event type with missing fields zeroed out.

Connect (Stage Server)

rust
let client = ThetaDataDxClient::connect(&creds, DirectConfig::stage()).await?;
python
client = ThetaDataDxClient(creds, Config.stage())
cpp
auto config = tdx::Config::stage();
auto client = tdx::UnifiedClient::connect(creds, config);
typescript
// Stage server config is not yet exposed in the TypeScript SDK.
// connectFromFile() always uses production config.
const client = await ThetaDataDxClient.connectFromFile('creds.txt');

Flush Mode

FpssFlushMode controls the latency/syscall tradeoff on the write path:

ModeFlush triggerAdded latencyBest for
Batched (default)PING frames every ~100msUp to 100msProduction throughput, matches Java terminal
ImmediateEvery frame writeNoneLowest latency trading
rust
use thetadatadx::config::FpssFlushMode;

let mut config = DirectConfig::production();
config.fpss_flush_mode = FpssFlushMode::Immediate; // lowest latency
let client = ThetaDataDxClient::connect(&creds, config).await?;
python
# Flush mode cannot currently be changed from the Python SDK.
# It defaults to Batched (flush on PING frames, ~100ms).
# Use the Rust SDK directly if you need Immediate mode.
client = ThetaDataDxClient(creds, Config.production())
cpp
auto config = tdx::Config::production();
config.set_flush_mode(tdx::FlushMode::Immediate);
auto client = tdx::UnifiedClient::connect(creds, config);
typescript
// Flush mode cannot currently be changed from the TypeScript SDK.
// It defaults to Batched (flush on PING frames, ~100ms).
const client = await ThetaDataDxClient.connectFromFile('creds.txt');

Custom FPSS Hosts

FPSS hosts are not hardcoded. You can override them:

rust
let mut config = DirectConfig::production();
config.fpss_hosts = vec![
    ("custom-host-a.example.com".to_string(), 20000),
    ("custom-host-b.example.com".to_string(), 20000),
];
let client = ThetaDataDxClient::connect(&creds, config).await?;

// Or parse from a comma-separated string (same format as config_0.properties):
let hosts = DirectConfig::parse_fpss_hosts("host-a:20000,host-b:20001")?;
python
# Custom hosts are configured at the Rust level via DirectConfig or
# TOML config file. Python inherits them from the config at connection time.
# Set hosts in config.toml:
#   [fpss]
#   hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
client = ThetaDataDxClient(creds, Config.production())
cpp
// Custom hosts are configured at the Rust level via DirectConfig or
// TOML config file. C++ inherits them from the config at connection time.
// Set hosts in config.toml:
//   [fpss]
//   hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
auto config = tdx::Config::production();
auto client = tdx::UnifiedClient::connect(creds, config);
typescript
// Custom hosts are configured at the Rust level via DirectConfig or
// TOML config file. TypeScript inherits them from the config at connection time.
// Set hosts in config.toml:
//   [fpss]
//   hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
const client = await ThetaDataDxClient.connectFromFile('creds.txt');

Or use a TOML config file (requires config-file feature):

toml
[fpss]
hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
# Or as CSV:
# hosts = "host-a.example.com:20000,host-b.example.com:20001"

Async/Sync Design

ThetaDataDx uses two different concurrency models for its two data paths:

PathRuntimeWhy
connect() + all historical methodsasync (tokio)gRPC/tonic requires tokio for HTTP/2 multiplexing
start_streaming() + callbackssync (OS threads)Dedicated I/O thread + LMAX Disruptor ring buffer for lowest latency
TypeScript EventIterator.next()Promise (napi-rs)Returns Promise<FpssEvent | null> (null = timeout / drained) so Node's event loop stays unblocked during the read timeout

What this means for your code:

  • You need a tokio runtime for connect() and any historical data call (stock_history_eod, etc.).
  • The streaming callback (FnMut(&FpssEvent)) runs on a plain OS thread -- no async executor involved. This eliminates all executor scheduling jitter from the hot path.
  • subscribe() and unsubscribe() are synchronous — they send a command through an internal channel to the I/O thread.
text
tokio runtime
  +-- connect()          async, gRPC/tonic/HTTP2
  +-- stock_history_*()  async, gRPC streaming

std::thread (fpss-io)
  +-- TLS read loop      blocking, 50ms timeout
  +-- Disruptor publish  lock-free, zero-alloc

std::thread (fpss-ping)
  +-- PING heartbeat     100ms sleep loop

Disruptor consumer thread
  +-- your callback(FnMut(&FpssEvent))

You can safely call subscribe(spec) / unsubscribe(spec) from any thread -- the command is sent through an mpsc channel and executed by the I/O thread.

Subscribe

Every binding exposes a single polymorphic subscribe() method on the unified ThetaDataDxClient. Build a typed subscription spec by calling a topic helper — quote(), trade(), open_interest() — on a Contract, or full_trades() / full_open_interest() on a SecType, and hand the result to subscribe().

rust
// Stock quotes
client.subscribe(Contract::stock("AAPL").quote())?;

// Stock trades
client.subscribe(Contract::stock("MSFT").trade())?;

// Quotes + trades in one call
client.subscribe(Contract::stock("TSLA").all())?;

// Option quotes
let opt = Contract::option("SPY", "20261218", "600", "C")?;
client.subscribe(opt.quote())?;

// Open interest
client.subscribe(Contract::stock("AAPL").open_interest())?;

// All trades for a security type (firehose)
client.subscribe(SecType::Stock.full_trades())?;

// All open interest for a security type (firehose)
client.subscribe(SecType::Option.full_open_interest())?;
python
client.subscribe(Contract.stock("AAPL").quote())
client.subscribe(Contract.stock("MSFT").trade())
client.subscribe(Contract.stock("SPY").open_interest())
client.subscribe(SecType.Stock.full_trades())
client.subscribe(SecType.Option.full_open_interest())
cpp
// Stock quotes
client.subscribe(tdx::Contract::stock("AAPL").quote());

// Stock trades
client.subscribe(tdx::Contract::stock("MSFT").trade());

// Open interest
client.subscribe(tdx::Contract::stock("AAPL").open_interest());

// All trades for a security type (firehose)
client.subscribe(tdx::SecType::Stock.full_trades());

// All open interest for a security type
client.subscribe(tdx::SecType::Option.full_open_interest());
typescript
// Stock quotes
client.subscribe(Contract.stock('AAPL').quote());

// Stock trades
client.subscribe(Contract.stock('MSFT').trade());

// Open interest
client.subscribe(Contract.stock('AAPL').openInterest());

// All trades for a security type (firehose)
client.subscribe(SecType.Stock.fullTrades());

// All open interest for a security type
client.subscribe(SecType.Option.fullOpenInterest());

Typed Contract on Data Events

FPSS assigns integer IDs to contracts on the wire, but the SDK resolves every data event's contract before user code sees it. Quote / Trade / OpenInterest / Ohlcvc events carry a typed contract with symbol, sec_type, expiration, strike / strike_dollars, and the option side. Each field surfaces in the language-idiomatic shape:

  • Rust: Contract { symbol: String, sec_type: SecType, expiration: Option<i32>, is_call: Option<bool>, strike: Option<i32> } plus the derived accessors right() -> Option<Right> and strike_dollars() -> Option<f64>.
  • Python: event.contract exposes symbol: str, sec_type: str ("STOCK" / "OPTION" / "INDEX" / "RATE"), expiration: Optional[int], right: Optional[str] ("C" / "P"), strike_dollars: Optional[float], and the wire-level strike: Optional[int].
  • TypeScript: same field set as Python (secType: string, right: string | null, strikeDollars: number | null, strike: number | null).
  • C / C++: TdxContract { const char* symbol; int32_t sec_type; bool has_expiration; int32_t expiration; bool has_right; char right; bool has_strike; int32_t strike; }. Strike stays in the wire integer form on the C ABI for layout stability; convert to dollars with strike / 1000.0 (or use the tdx::Contract C++ wrapper's accessors).

User code reads the symbol directly off the event without a side-table lookup.

rust
client.start_streaming(|event: &FpssEvent| {
    match event {
        // Read the typed contract directly off the event; no integer
        // ID lookup required.
        FpssEvent::Data(FpssData::Quote { contract, bid, ask, .. }) => {
            println!("{}: bid={bid:.2} ask={ask:.2}", contract.symbol);
        }
        FpssEvent::Data(FpssData::Trade { contract, price, size, .. }) => {
            println!("{}: price={price:.2} size={size}", contract.symbol);
        }
        _ => {}
    }
})?;

// Snapshot active subscriptions — useful for diagnostics dashboards.
let subs = client.active_subscriptions()?;
for (kind, contract) in subs {
    println!("  {kind:?}: {}", contract.symbol);
}
python
# Pull-iter mode: read `event.contract.symbol` directly off the
# typed event.
with client.streaming_iter() as it:
    for event in it:
        if event.kind == "quote":
            print(f"[QUOTE] {event.contract.symbol}: bid={event.bid} ask={event.ask}")
        elif event.kind == "trade":
            print(f"[TRADE] {event.contract.symbol}: price={event.price} size={event.size}")

# Snapshot active subscriptions.
for sub in client.active_subscriptions():
    print(sub)
cpp
// Pull-iter: each event carries event->quote.contract.symbol etc.
// directly — `has_expiration` / `has_right` / `has_strike` gate
// the option-only fields.

// Snapshot active subscriptions.
auto subs = client.active_subscriptions();
for (const auto& sub : subs) {
    std::cout << "  " << static_cast<int>(sub.kind)
              << ": " << sub.contract.symbol << std::endl;
}
typescript
// Each event carries the resolved typed contract directly — read
// event.contract.symbol off the event in your callback or async
// iterator loop, no side-table lookup required.

// Snapshot active subscriptions.
const subs = client.activeSubscriptions();

Unsubscribe

Unsubscribe takes the same typed subscription spec as subscribe() — pass the matching quote() / trade() / open_interest() / full_trades() / full_open_interest() topic.

rust
client.unsubscribe(Contract::stock("AAPL").quote())?;
client.unsubscribe(Contract::stock("MSFT").trade())?;
client.unsubscribe(Contract::stock("AAPL").open_interest())?;
client.unsubscribe(SecType::Stock.full_trades())?;
client.unsubscribe(SecType::Option.full_open_interest())?;
python
client.unsubscribe(Contract.stock("AAPL").quote())
client.unsubscribe(Contract.stock("MSFT").trade())
client.unsubscribe(Contract.stock("SPY").open_interest())
client.unsubscribe(SecType.Stock.full_trades())
client.unsubscribe(SecType.Option.full_open_interest())
cpp
client.unsubscribe(tdx::Contract::stock("AAPL").quote());
client.unsubscribe(tdx::Contract::stock("MSFT").trade());
client.unsubscribe(tdx::Contract::stock("AAPL").open_interest());
client.unsubscribe(tdx::SecType::Stock.full_trades());
client.unsubscribe(tdx::SecType::Option.full_open_interest());
typescript
client.unsubscribe(Contract.stock('AAPL').quote());
client.unsubscribe(Contract.stock('MSFT').trade());
client.unsubscribe(Contract.stock('AAPL').openInterest());
client.unsubscribe(SecType.Stock.fullTrades());
client.unsubscribe(SecType.Option.fullOpenInterest());

Stop Streaming

rust
client.stop_streaming();
python
client.stop_streaming()
cpp
client.stop_streaming();
// RAII also handles cleanup: the ThetaDataDxClient destructor stops streaming on drop.
typescript
client.stopStreaming();

Released under the Apache-2.0 License.